use crate::writer::{OutputSink, Writer};
use anyhow::{anyhow, bail, Context as _, Error, Result};
use diagnostics_data::LogsData;
use fidl_fuchsia_fuzzer as fuzz;
use futures::io::ReadHalf;
use futures::{try_join, AsyncReadExt};
use serde_json::Deserializer;
use std::cell::RefCell;
use std::path::Path;
use std::rc::Rc;
#[derive(Debug)]
pub struct Forwarder<O: OutputSink> {
stdout: Option<SocketForwarder<O>>,
stderr: Option<SocketForwarder<O>>,
syslog: Option<SocketForwarder<O>>,
writer: Writer<O>,
}
impl<O: OutputSink> Forwarder<O> {
pub fn new(writer: &Writer<O>) -> Self {
Self { stdout: None, stderr: None, syslog: None, writer: writer.clone() }
}
pub fn set_output<P: AsRef<Path>>(
&mut self,
socket: fidl::Socket,
output: fuzz::TestOutput,
logs_dir: &Option<P>,
) -> Result<()> {
match output {
fuzz::TestOutput::Stdout => {
let forwarder = self.create_forwarder(logs_dir, "stdout", "txt", socket)?;
self.stdout = Some(forwarder);
}
fuzz::TestOutput::Stderr => {
let forwarder = self.create_forwarder(logs_dir, "stderr", "txt", socket)?;
self.stderr = Some(forwarder);
}
fuzz::TestOutput::Syslog => {
let forwarder = self.create_forwarder(logs_dir, "syslog", "json", socket)?;
self.syslog = Some(forwarder);
}
_ => unreachable!(),
}
Ok(())
}
fn create_forwarder<P: AsRef<Path>>(
&self,
logs_dir: &Option<P>,
name: &str,
extension: &str,
socket: fidl::Socket,
) -> Result<SocketForwarder<O>> {
let writer = match logs_dir {
Some(logs_dir) => self
.writer
.tee(logs_dir, format!("fuzzer.{}.{}", name, extension))
.context(format!("failed to create file for {}", name)),
None => Ok(self.writer.clone()),
}?;
let forwarder = SocketForwarder::try_new(socket, &writer)
.context(format!("failed to create forwarder for {}", name))?;
Ok(forwarder)
}
pub async fn forward_all(&self) -> Result<()> {
let stdout = self.stdout.clone();
let stdout_fut = || async move {
if let Some(stdout) = stdout {
stdout.forward_text("stdout").await.context("failed to forward stdout")?;
}
Ok::<(), Error>(())
};
let stderr = self.stderr.clone();
let stderr_fut = || async move {
if let Some(stderr) = stderr {
stderr.forward_text("stderr").await.context("failed to forward stderr")?;
}
Ok::<(), Error>(())
};
let syslog = self.syslog.clone();
let syslog_fut = || async move {
if let Some(syslog) = syslog {
syslog.forward_json("syslog").await.context("failed to forward syslog")?;
}
Ok::<(), Error>(())
};
try_join!(stdout_fut(), stderr_fut(), syslog_fut())?;
Ok(())
}
}
#[derive(Debug)]
pub struct SocketForwarder<O: OutputSink> {
reader: Rc<RefCell<ReadHalf<fidl::AsyncSocket>>>,
writer: Writer<O>,
}
impl<O: OutputSink> Clone for SocketForwarder<O> {
fn clone(&self) -> Self {
Self { reader: Rc::clone(&self.reader), writer: self.writer.clone() }
}
}
impl<O: OutputSink> SocketForwarder<O> {
pub fn try_new(socket: fidl::Socket, writer: &Writer<O>) -> Result<Self> {
let socket = fidl::AsyncSocket::from_socket(socket);
let (reader, _) = socket.split();
Ok(Self { reader: Rc::new(RefCell::new(reader)), writer: writer.clone() })
}
pub async fn forward_text(&self, name: &str) -> Result<()> {
let mut reader = self.reader.borrow_mut();
let mut buf: [u8; 2048] = [0; 2048];
let mut raw = Vec::new();
let newline = '\n' as u8;
let done_marker = format!("{}\n", fuzz::DONE_MARKER);
let done_marker = done_marker.as_bytes();
loop {
match reader
.read(&mut buf)
.await
.context(format!("failed to read text data from {} socket", name))?
{
0 => {
self.writer.write_all(&raw);
bail!("{} from fuzzer ended prematurely", name);
}
num_read => raw.extend_from_slice(&buf[0..num_read]),
};
let data = raw;
raw = Vec::new();
for message in data.split_inclusive(|&x| x == newline) {
if message == done_marker {
return Ok(());
} else if message.last() == Some(&newline) {
self.writer.write_all(&message);
} else {
raw = message.to_vec();
}
}
}
}
pub async fn forward_json(&self, name: &str) -> Result<()> {
let mut reader = self.reader.borrow_mut();
let mut buf: [u8; 2048] = [0; 2048];
let mut raw = Vec::new();
loop {
match reader
.read(&mut buf)
.await
.context(format!("failed to read JSON data from {} socket", name))?
{
0 => {
self.writer.write_all(&raw);
bail!("{} from fuzzer ended prematurely", name);
}
num_read => raw.extend_from_slice(&buf[0..num_read]),
};
let deserializer = Deserializer::from_slice(&raw);
let mut stream = deserializer.into_iter::<Vec<LogsData>>();
while let Some(items) = stream.next() {
let logs_data = match items {
Err(e) if e.is_eof() => break,
other => other,
}
.map_err(|e| anyhow!(format!("serde_json: {:?}", e)))
.context("failed to deserialize")?;
for log_data in logs_data.into_iter() {
if let Some(message) = log_data.msg() {
if message == fuzz::DONE_MARKER {
return Ok(());
}
}
self.writer.log(log_data);
}
}
let num_read = stream.byte_offset();
raw.drain(0..num_read);
}
}
}
#[cfg(test)]
mod tests {
use anyhow::{Error, Result};
use diagnostics_data::LogsData;
use fidl::Socket;
use fidl_fuchsia_fuzzer as fuzz;
use fuchsia_fuzzctl::{Forwarder, SocketForwarder};
use fuchsia_fuzzctl_test::{send_log_entry, Test};
use futures::{try_join, AsyncWriteExt};
use std::fs;
#[fuchsia::test]
async fn test_forward_text() -> Result<()> {
let mut test = Test::try_new()?;
let (tx, rx) = Socket::create_stream();
let forwarder = SocketForwarder::try_new(rx, test.writer())?;
let socket_fut = || async move {
let mut tx = fidl::AsyncSocket::from_socket(tx);
tx.write_all(b"hello\nworld!\n").await?;
let done_marker = format!("{}\n", fuzz::DONE_MARKER);
tx.write_all(done_marker.as_bytes()).await?;
Ok::<(), Error>(())
};
test.output_matches("hello");
test.output_matches("world!");
try_join!(forwarder.forward_text("test"), socket_fut())?;
test.verify_output()
}
#[fuchsia::test]
async fn test_forward_json() -> Result<()> {
let mut test = Test::try_new()?;
let (tx, rx) = Socket::create_stream();
let forwarder = SocketForwarder::try_new(rx, test.writer())?;
let socket_fut = || async move {
let mut tx = fidl::AsyncSocket::from_socket(tx);
send_log_entry(&mut tx, "hello world").await?;
send_log_entry(&mut tx, fuzz::DONE_MARKER).await?;
Ok::<(), Error>(())
};
try_join!(forwarder.forward_json("test"), socket_fut())?;
test.output_matches("[0.000][moniker][][I] hello world");
test.verify_output()
}
#[fuchsia::test]
async fn test_forward_all() -> Result<()> {
let mut test = Test::try_new()?;
let logs_dir = test.create_dir("logs")?;
let logs_dir = Some(logs_dir);
let mut forwarder = Forwarder::new(test.writer());
let (stdout_tx, stdout_rx) = Socket::create_stream();
forwarder.set_output(stdout_rx, fuzz::TestOutput::Stdout, &logs_dir)?;
let (stderr_tx, stderr_rx) = Socket::create_stream();
forwarder.set_output(stderr_rx, fuzz::TestOutput::Stderr, &logs_dir)?;
let (syslog_tx, syslog_rx) = Socket::create_stream();
forwarder.set_output(syslog_rx, fuzz::TestOutput::Syslog, &logs_dir)?;
let done_marker = format!("{}\n", fuzz::DONE_MARKER);
let done_marker_bytes = done_marker.as_bytes();
let a_done_marker = format!("a{}\n", fuzz::DONE_MARKER);
test.output_matches(a_done_marker.clone());
let done_marker_a = format!("{}a\n", fuzz::DONE_MARKER);
test.output_matches(done_marker_a.clone());
let socket_fut = || async move {
let mut stdout_tx = fidl::AsyncSocket::from_socket(stdout_tx);
let mut stderr_tx = fidl::AsyncSocket::from_socket(stderr_tx);
let mut syslog_tx = fidl::AsyncSocket::from_socket(syslog_tx);
send_log_entry(&mut syslog_tx, fuzz::DONE_MARKER).await?;
stdout_tx.write_all(done_marker_bytes).await?;
stdout_tx.write_all(b"after\n").await?;
stderr_tx.write_all(a_done_marker.as_bytes()).await?;
stderr_tx.write_all(done_marker_a.as_bytes()).await?;
for i in 0..done_marker_bytes.len() {
stderr_tx.write_all(&done_marker_bytes[i..i + 1]).await?;
}
stderr_tx.write_all(b"after\n").await?;
Ok::<(), Error>(())
};
try_join!(forwarder.forward_all(), socket_fut())?;
test.verify_output()
}
#[fuchsia::test]
async fn test_forward_to_file() -> Result<()> {
let test = Test::try_new()?;
let logs_dir = test.create_dir("logs")?;
let logs_dir = Some(logs_dir);
let mut forwarder = Forwarder::new(test.writer());
let (stdout_tx, stdout_rx) = Socket::create_stream();
forwarder.set_output(stdout_rx, fuzz::TestOutput::Stdout, &logs_dir)?;
let (stderr_tx, stderr_rx) = Socket::create_stream();
forwarder.set_output(stderr_rx, fuzz::TestOutput::Stderr, &logs_dir)?;
let (syslog_tx, syslog_rx) = Socket::create_stream();
forwarder.set_output(syslog_rx, fuzz::TestOutput::Syslog, &logs_dir)?;
let sockets_fut = || async move {
let done_marker = format!("{}\n", fuzz::DONE_MARKER);
let done_marker_bytes = done_marker.as_bytes();
let mut stdout_tx = fidl::AsyncSocket::from_socket(stdout_tx);
stdout_tx.write_all(b"hello world!\n").await?;
stdout_tx.write_all(done_marker_bytes).await?;
let mut stderr_tx = fidl::AsyncSocket::from_socket(stderr_tx);
stderr_tx.write_all(b"hel").await?;
stderr_tx.write_all(b"lo ").await?;
stderr_tx.write_all(b"wor").await?;
stderr_tx.write_all(b"ld!\n").await?;
stderr_tx.write_all(done_marker_bytes).await?;
let mut syslog_tx = fidl::AsyncSocket::from_socket(syslog_tx);
send_log_entry(&mut syslog_tx, "hello world!").await?;
send_log_entry(&mut syslog_tx, fuzz::DONE_MARKER).await?;
Ok::<(), Error>(())
};
let sockets_fut = sockets_fut();
try_join!(forwarder.forward_all(), sockets_fut)?;
let logs_dir = logs_dir.unwrap();
assert_eq!(fs::read(logs_dir.join("fuzzer.stdout.txt"))?, b"hello world!\n");
assert_eq!(fs::read(logs_dir.join("fuzzer.stderr.txt"))?, b"hello world!\n");
let data = fs::read(logs_dir.join("fuzzer.syslog.json"))?;
let logs_data: LogsData = serde_json::from_slice(&data)?;
assert_eq!(logs_data.msg(), Some("hello world!"));
Ok(())
}
}