1use crate::artifacts;
6use crate::cancel::NamedFutureExt;
7use crate::diagnostics::{self, LogCollectionOutcome};
8use crate::outcome::{ConnectionError, RunTestSuiteError, UnexpectedEventError};
9use crate::output::{
10 ArtifactType, DirectoryArtifactType, DynDirectoryArtifact, DynReporter, EntityReporter,
11};
12use crate::stream_util::StreamUtil;
13use anyhow::{Context as _, anyhow};
14use flex_client::Peered;
15use futures::AsyncReadExt;
16use futures::future::{BoxFuture, FutureExt, TryFutureExt, join_all};
17use futures::stream::{FuturesUnordered, StreamExt, TryStreamExt};
18use log::{debug, warn};
19use std::borrow::Borrow;
20use std::collections::VecDeque;
21use std::io::Write;
22use std::path::PathBuf;
23
24#[cfg(feature = "fdomain")]
25use test_diagnostics_fdomain as test_diagnostics;
26
27use flex_fuchsia_io as fio;
28use flex_fuchsia_test_manager as ftest_manager;
29use fuchsia_async as fasync;
30use test_diagnostics::zstd_compress::Decoder;
31
32pub(crate) async fn drain_artifact<'a, E, T>(
40 reporter: &'a EntityReporter<E, T>,
41 artifact: ftest_manager::Artifact,
42 log_opts: diagnostics::LogCollectionOptions,
43) -> Result<
44 BoxFuture<'static, Result<Option<LogCollectionOutcome>, anyhow::Error>>,
45 RunTestSuiteError,
46>
47where
48 T: Borrow<DynReporter>,
49{
50 match artifact {
51 ftest_manager::Artifact::Stdout(socket) => {
52 let stdout = reporter.new_artifact(&ArtifactType::Stdout)?;
53 Ok(copy_socket_artifact(socket, stdout).map_ok(|_| None).named("stdout").boxed())
54 }
55 ftest_manager::Artifact::Stderr(socket) => {
56 let stderr = reporter.new_artifact(&ArtifactType::Stderr)?;
57 Ok(copy_socket_artifact(socket, stderr).map_ok(|_| None).named("stderr").boxed())
58 }
59 ftest_manager::Artifact::Log(syslog) => {
60 let syslog_artifact = reporter.new_artifact(&ArtifactType::Syslog)?;
61 Ok(diagnostics::collect_logs(
62 test_diagnostics::LogStream::from_syslog(syslog)
63 .map_err(|e| RunTestSuiteError::Connection(ConnectionError(e.into())))?,
64 syslog_artifact,
65 log_opts,
66 )
67 .map_ok(Some)
68 .named("syslog")
69 .boxed())
70 }
71 ftest_manager::Artifact::Custom(ftest_manager::CustomArtifact {
72 directory_and_token,
73 component_moniker,
74 ..
75 }) => {
76 let ftest_manager::DirectoryAndToken { directory, token, .. } = directory_and_token
77 .ok_or(UnexpectedEventError::MissingRequiredField {
78 containing_struct: "CustomArtifact",
79 field: "directory_and_token",
80 })?;
81 let directory_artifact = reporter
82 .new_directory_artifact(&DirectoryArtifactType::Custom, component_moniker)?;
83 Ok(async move {
84 let directory = directory.into_proxy();
85 let result =
86 artifacts::copy_custom_artifact_directory(directory, directory_artifact).await;
87 let _ = token.signal_peer(fidl::Signals::empty(), fidl::Signals::USER_0);
91 result
92 }
93 .map_ok(|()| None)
94 .named("custom_artifacts")
95 .boxed())
96 }
97 ftest_manager::Artifact::DebugData(iterator) => {
98 let output_directory = reporter
99 .new_directory_artifact(&DirectoryArtifactType::Debug, None )?;
100 Ok(artifacts::copy_debug_data(iterator.into_proxy(), output_directory)
101 .map(|()| Ok(None))
102 .named("debug_data")
103 .boxed())
104 }
105 ftest_manager::ArtifactUnknown!() => {
106 warn!("Encountered an unknown artifact");
107 Ok(futures::future::ready(Ok(None)).boxed())
108 }
109 }
110}
111
112async fn copy_socket_artifact<W: Write>(
114 socket: flex_client::Socket,
115 mut artifact: W,
116) -> Result<usize, anyhow::Error> {
117 let mut async_socket = flex_client::socket_to_async(socket);
118 let mut len = 0;
119 loop {
120 let done =
121 test_diagnostics::SocketReadFut::new(&mut async_socket, |maybe_buf| match maybe_buf {
122 Some(buf) => {
123 len += buf.len();
124 artifact.write_all(buf)?;
125 Ok(false)
126 }
127 None => Ok(true),
128 })
129 .await?;
130 if done {
131 artifact.flush()?;
132 return Ok(len);
133 }
134 }
135}
136
137async fn copy_socket_artifact_and_decompress<W: Write>(
140 socket: flex_client::Socket,
141 mut artifact: W,
142) -> Result<(usize, usize), anyhow::Error> {
143 let mut async_socket = flex_client::socket_to_async(socket);
144 let mut buf = vec![0u8; 1024 * 1024 * 2];
145
146 let (mut decoder, mut receiver) = Decoder::new();
147 let task: fasync::Task<Result<usize, anyhow::Error>> = fasync::Task::spawn(async move {
148 let mut len = 0;
149 loop {
150 let l = async_socket.read(&mut buf).await?;
151 match l {
152 0 => {
153 decoder.finish().await?;
154 break;
155 }
156 _ => {
157 len += l;
158 decoder.decompress(&buf[..l]).await?;
159 }
160 }
161 }
162 Ok(len)
163 });
164
165 let mut decompressed_len = 0;
166 while let Some(buf) = receiver.next().await {
167 decompressed_len += buf.len();
168 artifact.write_all(&buf)?;
169 }
170 artifact.flush()?;
171
172 let compressed_len = task.await?;
173 return Ok((decompressed_len, compressed_len));
174}
175
176pub async fn copy_debug_data(
178 iterator: ftest_manager::DebugDataIteratorProxy,
179 output_directory: Box<DynDirectoryArtifact>,
180) {
181 let start = std::time::Instant::now();
182 const PIPELINED_REQUESTS: usize = 4;
183 let unprocessed_data_stream =
184 futures::stream::repeat_with(move || iterator.get_next_compressed())
185 .buffered(PIPELINED_REQUESTS);
186 let terminated_event_stream =
187 unprocessed_data_stream.take_until_stop_after(|result| match &result {
188 Ok(events) => events.is_empty(),
189 _ => true,
190 });
191
192 let data_futs = terminated_event_stream
193 .map(|result| match result {
194 Ok(vals) => vals,
195 Err(e) => {
196 warn!("Request failure: {:?}", e);
197 vec![]
198 }
199 })
200 .map(futures::stream::iter)
201 .flatten()
202 .map(|debug_data| {
203 let output =
204 debug_data.name.as_ref().ok_or_else(|| anyhow!("Missing profile name")).and_then(
205 |name| {
206 output_directory.new_file(&PathBuf::from(name)).map_err(anyhow::Error::from)
207 },
208 );
209 fasync::Task::spawn(async move {
210 let _ = &debug_data;
211 let mut output = output?;
212 let socket =
213 debug_data.socket.ok_or_else(|| anyhow!("Missing profile socket handle"))?;
214 debug!("Reading run profile \"{:?}\"", debug_data.name);
215 let start = std::time::Instant::now();
216 let (decompressed_len, compressed_len) =
217 copy_socket_artifact_and_decompress(socket, &mut output).await.map_err(
218 |e| {
219 warn!("Error copying artifact '{:?}': {:?}", debug_data.name, e);
220 e
221 },
222 )?;
223
224 debug!(
225 "Copied file {:?}: {}({} - compressed) bytes in {:?}",
226 debug_data.name,
227 decompressed_len,
228 compressed_len,
229 start.elapsed()
230 );
231 Ok::<(), anyhow::Error>(())
232 })
233 })
234 .collect::<Vec<_>>()
235 .await;
236 join_all(data_futs).await;
237 debug!("All profiles downloaded in {:?}", start.elapsed());
238}
239
240async fn copy_custom_artifact_directory(
242 directory: fio::DirectoryProxy,
243 out_dir: Box<DynDirectoryArtifact>,
244) -> Result<(), anyhow::Error> {
245 let mut paths = vec![];
246 let mut enumerate = fuchsia_fs::directory::readdir_recursive(&directory, None);
247 while let Ok(Some(file)) = enumerate.try_next().await {
248 if file.kind == fuchsia_fs::directory::DirentKind::File {
249 paths.push(file.name);
250 }
251 }
252
253 let futs = FuturesUnordered::new();
254 paths.iter().for_each(|path| {
255 let file =
256 fuchsia_fs::directory::open_file_async(&directory, path, fuchsia_fs::PERM_READABLE);
257 let output_file = out_dir.new_file(std::path::Path::new(path));
258 futs.push(async move {
259 let file = file.with_context(|| format!("with path {:?}", path))?;
260 let mut output_file = output_file?;
261
262 copy_file_to_writer(&file, &mut output_file).await.map(|_| ())
263 });
264 });
265
266 futs.for_each(|result| {
267 if let Err(e) = result {
268 warn!("Custom artifact failure: {}", e);
269 }
270 async move {}
271 })
272 .await;
273
274 Ok(())
275}
276
277async fn copy_file_to_writer<T: Write>(
278 file: &fio::FileProxy,
279 output: &mut T,
280) -> Result<usize, anyhow::Error> {
281 const READ_SIZE: u64 = fio::MAX_BUF;
282
283 let mut vector = VecDeque::new();
284 const PIPELINED_READ_COUNT: u64 = 4;
286 for _n in 0..PIPELINED_READ_COUNT {
287 vector.push_back(file.read(READ_SIZE));
288 }
289 let mut len = 0;
290 loop {
291 let buf = vector.pop_front().unwrap().await?.map_err(zx_status::Status::from_raw)?;
292 if buf.is_empty() {
293 break;
294 }
295 len += buf.len();
296 output.write_all(&buf)?;
297 vector.push_back(file.read(READ_SIZE));
298 }
299 Ok(len)
300}
301
302#[cfg(test)]
303mod socket_tests {
304 use super::*;
305 use futures::AsyncWriteExt;
306
307 #[fuchsia::test]
308 async fn copy_socket() {
309 let cases = vec![vec![], b"0123456789abcde".to_vec(), vec![0u8; 4096]];
310
311 for case in cases.iter() {
312 let (client_socket, server_socket) = fidl::Socket::create_stream();
313 let mut output = vec![];
314 let write_fut = async move {
315 let mut async_socket = fidl::AsyncSocket::from_socket(server_socket);
316 async_socket.write_all(case.as_slice()).await.expect("write bytes");
317 };
318
319 let ((), res) =
320 futures::future::join(write_fut, copy_socket_artifact(client_socket, &mut output))
321 .await;
322 res.expect("copy contents");
323 assert_eq!(output.as_slice(), case.as_slice());
324 }
325 }
326}
327
328#[cfg(target_os = "fuchsia")]
330#[cfg(test)]
331mod file_tests {
332 use super::*;
333 use crate::output::InMemoryDirectoryWriter;
334 use flex_fuchsia_io as fio;
335 use fuchsia_async as fasync;
336 use futures::prelude::*;
337 use maplit::hashmap;
338 use std::collections::HashMap;
339 use std::sync::Arc;
340 use vfs::directory::helper::DirectlyMutable;
341 use vfs::directory::immutable::Simple;
342 use vfs::file::vmo::read_only;
343 use vfs::pseudo_directory;
344
345 async fn serve_content_over_socket(content: Vec<u8>, socket: zx::Socket) {
346 let mut socket = fidl::AsyncSocket::from_socket(socket);
347 socket.write_all(content.as_slice()).await.expect("Cannot serve content over socket");
348 }
349
350 async fn serve_and_copy_debug_data(
351 expected_files: &HashMap<PathBuf, Vec<u8>>,
352 directory_writer: InMemoryDirectoryWriter,
353 ) {
354 let mut served_files = vec![];
355 expected_files.iter().for_each(|(path, content)| {
356 let mut compressor = zstd::bulk::Compressor::new(0).unwrap();
357 let bytes = compressor.compress(&content).unwrap();
358 let (client, server) = zx::Socket::create_stream();
359 fasync::Task::spawn(serve_content_over_socket(bytes, server)).detach();
360 served_files.push(ftest_manager::DebugData {
361 name: Some(path.display().to_string()),
362 socket: Some(client.into()),
363 ..Default::default()
364 });
365 });
366
367 let (iterator_proxy, mut iterator_stream) =
368 fidl::endpoints::create_proxy_and_stream::<ftest_manager::DebugDataIteratorMarker>();
369 let serve_fut = async move {
370 let mut files_iter = served_files.into_iter();
371 while let Ok(Some(request)) = iterator_stream.try_next().await {
372 let responder = match request {
373 ftest_manager::DebugDataIteratorRequest::GetNext { .. } => {
374 panic!("Not Implemented");
375 }
376 ftest_manager::DebugDataIteratorRequest::GetNextCompressed { responder } => {
377 responder
378 }
379 };
380 let resp: Vec<_> = files_iter.by_ref().take(3).collect();
381 let _ = responder.send(resp);
382 }
383 };
384 futures::future::join(
385 serve_fut,
386 copy_debug_data(iterator_proxy, Box::new(directory_writer)),
387 )
388 .await;
389 }
390
391 fn test_cases() -> Vec<(&'static str, Arc<Simple>, HashMap<PathBuf, Vec<u8>>)> {
392 vec![
393 ("empty", pseudo_directory! {}, hashmap! {}),
394 (
395 "single file",
396 pseudo_directory! {
397 "test_file.txt" => read_only("Hello, World!"),
398 },
399 hashmap! {
400 "test_file.txt".to_string().into() => b"Hello, World!".to_vec()
401 },
402 ),
403 (
404 "subdir",
405 pseudo_directory! {
406 "sub" => pseudo_directory! {
407 "nested.txt" => read_only("Nested file!"),
408 }
409 },
410 hashmap! {
411 "sub/nested.txt".to_string().into() => b"Nested file!".to_vec()
412 },
413 ),
414 (
415 "empty file",
416 pseudo_directory! {
417 "empty.txt" => read_only(""),
418 },
419 hashmap! {
420 "empty.txt".to_string().into() => b"".to_vec()
421 },
422 ),
423 (
424 "big file",
425 pseudo_directory! {
426 "big.txt" => read_only(vec![b's'; (fio::MAX_BUF as usize)*2]),
427 },
428 hashmap! {
429 "big.txt".to_string().into() => vec![b's'; (fio::MAX_BUF as usize) *2 as usize]
430 },
431 ),
432 (
433 "100 files",
434 {
435 let dir = pseudo_directory! {};
436 for i in 0..100 {
437 dir.add_entry(
438 format!("{:?}.txt", i),
439 read_only(format!("contents for {:?}", i)),
440 )
441 .expect("add file");
442 }
443 dir
444 },
445 (0..100)
446 .map(|i| {
447 (
448 format!("{:?}.txt", i).into(),
449 format!("contents for {:?}", i).into_bytes(),
450 )
451 })
452 .collect(),
453 ),
454 ]
455 }
456
457 #[fuchsia::test]
458 async fn test_copy_dir() {
459 for (name, fake_dir, expected_files) in test_cases() {
460 let directory =
461 vfs::directory::serve(fake_dir, fio::PERM_READABLE | fio::PERM_WRITABLE);
462 let artifact = InMemoryDirectoryWriter::default();
463 copy_custom_artifact_directory(directory, Box::new(artifact.clone()))
464 .await
465 .expect("reading custom directory");
466 let actual_files: HashMap<_, _> = artifact
467 .files
468 .lock()
469 .iter()
470 .map(|(path, artifact)| (path.clone(), artifact.get_contents()))
471 .collect();
472 assert_eq!(expected_files, actual_files, "{}", name);
473 }
474 }
475
476 #[fuchsia::test]
477 async fn test_copy_debug_data() {
478 for (name, _fake_dir, expected_files) in test_cases() {
479 let artifact = InMemoryDirectoryWriter::default();
480 serve_and_copy_debug_data(&expected_files, artifact.clone()).await;
481 let actual_files: HashMap<_, _> = artifact
482 .files
483 .lock()
484 .iter()
485 .map(|(path, artifact)| (path.clone(), artifact.get_contents()))
486 .collect();
487 assert_eq!(expected_files, actual_files, "{}", name);
488 }
489 }
490}