1use crate::writer::{OutputSink, Writer};
6use anyhow::{anyhow, bail, Context as _, Error, Result};
7use diagnostics_data::LogsData;
8use fidl_fuchsia_fuzzer as fuzz;
9use futures::io::ReadHalf;
10use futures::{try_join, AsyncReadExt};
11use serde_json::Deserializer;
12use std::cell::RefCell;
13use std::path::Path;
14use std::rc::Rc;
15
16#[derive(Debug)]
18pub struct Forwarder<O: OutputSink> {
19 stdout: Option<SocketForwarder<O>>,
20 stderr: Option<SocketForwarder<O>>,
21 syslog: Option<SocketForwarder<O>>,
22 writer: Writer<O>,
23}
24
25impl<O: OutputSink> Forwarder<O> {
26 pub fn new(writer: &Writer<O>) -> Self {
35 Self { stdout: None, stderr: None, syslog: None, writer: writer.clone() }
36 }
37
38 pub fn set_output<P: AsRef<Path>>(
40 &mut self,
41 socket: fidl::Socket,
42 output: fuzz::TestOutput,
43 logs_dir: &Option<P>,
44 ) -> Result<()> {
45 match output {
46 fuzz::TestOutput::Stdout => {
47 let forwarder = self.create_forwarder(logs_dir, "stdout", "txt", socket)?;
48 self.stdout = Some(forwarder);
49 }
50 fuzz::TestOutput::Stderr => {
51 let forwarder = self.create_forwarder(logs_dir, "stderr", "txt", socket)?;
52 self.stderr = Some(forwarder);
53 }
54 fuzz::TestOutput::Syslog => {
55 let forwarder = self.create_forwarder(logs_dir, "syslog", "json", socket)?;
56 self.syslog = Some(forwarder);
57 }
58 _ => unreachable!(),
59 }
60 Ok(())
61 }
62
63 fn create_forwarder<P: AsRef<Path>>(
64 &self,
65 logs_dir: &Option<P>,
66 name: &str,
67 extension: &str,
68 socket: fidl::Socket,
69 ) -> Result<SocketForwarder<O>> {
70 let writer = match logs_dir {
71 Some(logs_dir) => self
72 .writer
73 .tee(logs_dir, format!("fuzzer.{}.{}", name, extension))
74 .context(format!("failed to create file for {}", name)),
75 None => Ok(self.writer.clone()),
76 }?;
77 let forwarder = SocketForwarder::try_new(socket, &writer)
78 .context(format!("failed to create forwarder for {}", name))?;
79 Ok(forwarder)
80 }
81
82 pub async fn forward_all(&self) -> Result<()> {
84 let stdout = self.stdout.clone();
85 let stdout_fut = || async move {
86 if let Some(stdout) = stdout {
87 stdout.forward_text("stdout").await.context("failed to forward stdout")?;
88 }
89 Ok::<(), Error>(())
90 };
91
92 let stderr = self.stderr.clone();
93 let stderr_fut = || async move {
94 if let Some(stderr) = stderr {
95 stderr.forward_text("stderr").await.context("failed to forward stderr")?;
96 }
97 Ok::<(), Error>(())
98 };
99
100 let syslog = self.syslog.clone();
101 let syslog_fut = || async move {
102 if let Some(syslog) = syslog {
103 syslog.forward_json("syslog").await.context("failed to forward syslog")?;
104 }
105 Ok::<(), Error>(())
106 };
107
108 try_join!(stdout_fut(), stderr_fut(), syslog_fut())?;
109 Ok(())
110 }
111}
112
113#[derive(Debug)]
115pub struct SocketForwarder<O: OutputSink> {
116 reader: Rc<RefCell<ReadHalf<fidl::AsyncSocket>>>,
117 writer: Writer<O>,
118}
119
120impl<O: OutputSink> Clone for SocketForwarder<O> {
121 fn clone(&self) -> Self {
122 Self { reader: Rc::clone(&self.reader), writer: self.writer.clone() }
123 }
124}
125
126impl<O: OutputSink> SocketForwarder<O> {
127 pub fn try_new(socket: fidl::Socket, writer: &Writer<O>) -> Result<Self> {
131 let socket = fidl::AsyncSocket::from_socket(socket);
132 let (reader, _) = socket.split();
133 Ok(Self { reader: Rc::new(RefCell::new(reader)), writer: writer.clone() })
134 }
135
136 pub async fn forward_text(&self, name: &str) -> Result<()> {
138 let mut reader = self.reader.borrow_mut();
139 let mut buf: [u8; 2048] = [0; 2048];
140 let mut raw = Vec::new();
141 let newline = '\n' as u8;
142 let done_marker = format!("{}\n", fuzz::DONE_MARKER);
143 let done_marker = done_marker.as_bytes();
144 loop {
145 match reader
146 .read(&mut buf)
147 .await
148 .context(format!("failed to read text data from {} socket", name))?
149 {
150 0 => {
151 self.writer.write_all(&raw);
152 bail!("{} from fuzzer ended prematurely", name);
153 }
154 num_read => raw.extend_from_slice(&buf[0..num_read]),
155 };
156 let data = raw;
157 raw = Vec::new();
158 for message in data.split_inclusive(|&x| x == newline) {
159 if message == done_marker {
160 return Ok(());
161 } else if message.last() == Some(&newline) {
162 self.writer.write_all(&message);
163 } else {
164 raw = message.to_vec();
165 }
166 }
167 }
168 }
169
170 pub async fn forward_json(&self, name: &str) -> Result<()> {
172 let mut reader = self.reader.borrow_mut();
173 let mut buf: [u8; 2048] = [0; 2048];
174 let mut raw = Vec::new();
175 loop {
176 match reader
177 .read(&mut buf)
178 .await
179 .context(format!("failed to read JSON data from {} socket", name))?
180 {
181 0 => {
182 self.writer.write_all(&raw);
183 bail!("{} from fuzzer ended prematurely", name);
184 }
185 num_read => raw.extend_from_slice(&buf[0..num_read]),
186 };
187 let deserializer = Deserializer::from_slice(&raw);
188 let mut stream = deserializer.into_iter::<Vec<LogsData>>();
189 while let Some(items) = stream.next() {
190 let logs_data = match items {
191 Err(e) if e.is_eof() => break,
192 other => other,
193 }
194 .map_err(|e| anyhow!(format!("serde_json: {:?}", e)))
195 .context("failed to deserialize")?;
196 for log_data in logs_data.into_iter() {
197 if let Some(message) = log_data.msg() {
198 if message == fuzz::DONE_MARKER {
199 return Ok(());
200 }
201 }
202 self.writer.log(log_data);
203 }
204 }
205 let num_read = stream.byte_offset();
206 raw.drain(0..num_read);
207 }
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use anyhow::{Error, Result};
214 use diagnostics_data::LogsData;
215 use fidl::Socket;
216 use fidl_fuchsia_fuzzer as fuzz;
217 use fuchsia_fuzzctl::{Forwarder, SocketForwarder};
218 use fuchsia_fuzzctl_test::{send_log_entry, Test};
219 use futures::{try_join, AsyncWriteExt};
220 use std::fs;
221
222 #[fuchsia::test]
223 async fn test_forward_text() -> Result<()> {
224 let mut test = Test::try_new()?;
225 let (tx, rx) = Socket::create_stream();
226 let forwarder = SocketForwarder::try_new(rx, test.writer())?;
227 let socket_fut = || async move {
228 let mut tx = fidl::AsyncSocket::from_socket(tx);
229 tx.write_all(b"hello\nworld!\n").await?;
230 let done_marker = format!("{}\n", fuzz::DONE_MARKER);
231 tx.write_all(done_marker.as_bytes()).await?;
232 Ok::<(), Error>(())
233 };
234 test.output_matches("hello");
235 test.output_matches("world!");
236 try_join!(forwarder.forward_text("test"), socket_fut())?;
237 test.verify_output()
238 }
239
240 #[fuchsia::test]
241 async fn test_forward_json() -> Result<()> {
242 let mut test = Test::try_new()?;
243 let (tx, rx) = Socket::create_stream();
244 let forwarder = SocketForwarder::try_new(rx, test.writer())?;
245 let socket_fut = || async move {
246 let mut tx = fidl::AsyncSocket::from_socket(tx);
247 send_log_entry(&mut tx, "hello world").await?;
248 send_log_entry(&mut tx, fuzz::DONE_MARKER).await?;
249 Ok::<(), Error>(())
250 };
251 try_join!(forwarder.forward_json("test"), socket_fut())?;
252 test.output_matches("[0.000][moniker][][I] hello world");
253 test.verify_output()
254 }
255
256 #[fuchsia::test]
257 async fn test_forward_all() -> Result<()> {
258 let mut test = Test::try_new()?;
259 let logs_dir = test.create_dir("logs")?;
260 let logs_dir = Some(logs_dir);
261 let mut forwarder = Forwarder::new(test.writer());
262
263 let (stdout_tx, stdout_rx) = Socket::create_stream();
264 forwarder.set_output(stdout_rx, fuzz::TestOutput::Stdout, &logs_dir)?;
265
266 let (stderr_tx, stderr_rx) = Socket::create_stream();
267 forwarder.set_output(stderr_rx, fuzz::TestOutput::Stderr, &logs_dir)?;
268
269 let (syslog_tx, syslog_rx) = Socket::create_stream();
270 forwarder.set_output(syslog_rx, fuzz::TestOutput::Syslog, &logs_dir)?;
271
272 let done_marker = format!("{}\n", fuzz::DONE_MARKER);
273 let done_marker_bytes = done_marker.as_bytes();
274
275 let a_done_marker = format!("a{}\n", fuzz::DONE_MARKER);
276 test.output_matches(a_done_marker.clone());
277
278 let done_marker_a = format!("{}a\n", fuzz::DONE_MARKER);
279 test.output_matches(done_marker_a.clone());
280
281 let socket_fut = || async move {
282 let mut stdout_tx = fidl::AsyncSocket::from_socket(stdout_tx);
283 let mut stderr_tx = fidl::AsyncSocket::from_socket(stderr_tx);
284 let mut syslog_tx = fidl::AsyncSocket::from_socket(syslog_tx);
285
286 send_log_entry(&mut syslog_tx, fuzz::DONE_MARKER).await?;
288
289 stdout_tx.write_all(done_marker_bytes).await?;
291 stdout_tx.write_all(b"after\n").await?;
292
293 stderr_tx.write_all(a_done_marker.as_bytes()).await?;
295 stderr_tx.write_all(done_marker_a.as_bytes()).await?;
296 for i in 0..done_marker_bytes.len() {
297 stderr_tx.write_all(&done_marker_bytes[i..i + 1]).await?;
298 }
299 stderr_tx.write_all(b"after\n").await?;
300 Ok::<(), Error>(())
301 };
302
303 try_join!(forwarder.forward_all(), socket_fut())?;
304 test.verify_output()
305 }
306
307 #[fuchsia::test]
308 async fn test_forward_to_file() -> Result<()> {
309 let test = Test::try_new()?;
310
311 let logs_dir = test.create_dir("logs")?;
312 let logs_dir = Some(logs_dir);
313 let mut forwarder = Forwarder::new(test.writer());
314
315 let (stdout_tx, stdout_rx) = Socket::create_stream();
316 forwarder.set_output(stdout_rx, fuzz::TestOutput::Stdout, &logs_dir)?;
317
318 let (stderr_tx, stderr_rx) = Socket::create_stream();
319 forwarder.set_output(stderr_rx, fuzz::TestOutput::Stderr, &logs_dir)?;
320
321 let (syslog_tx, syslog_rx) = Socket::create_stream();
322 forwarder.set_output(syslog_rx, fuzz::TestOutput::Syslog, &logs_dir)?;
323
324 let sockets_fut = || async move {
325 let done_marker = format!("{}\n", fuzz::DONE_MARKER);
326 let done_marker_bytes = done_marker.as_bytes();
327
328 let mut stdout_tx = fidl::AsyncSocket::from_socket(stdout_tx);
330 stdout_tx.write_all(b"hello world!\n").await?;
331 stdout_tx.write_all(done_marker_bytes).await?;
332
333 let mut stderr_tx = fidl::AsyncSocket::from_socket(stderr_tx);
335 stderr_tx.write_all(b"hel").await?;
336 stderr_tx.write_all(b"lo ").await?;
337 stderr_tx.write_all(b"wor").await?;
338 stderr_tx.write_all(b"ld!\n").await?;
339 stderr_tx.write_all(done_marker_bytes).await?;
340
341 let mut syslog_tx = fidl::AsyncSocket::from_socket(syslog_tx);
343 send_log_entry(&mut syslog_tx, "hello world!").await?;
344 send_log_entry(&mut syslog_tx, fuzz::DONE_MARKER).await?;
345 Ok::<(), Error>(())
346 };
347 let sockets_fut = sockets_fut();
348 try_join!(forwarder.forward_all(), sockets_fut)?;
349 let logs_dir = logs_dir.unwrap();
350 assert_eq!(fs::read(logs_dir.join("fuzzer.stdout.txt"))?, b"hello world!\n");
351 assert_eq!(fs::read(logs_dir.join("fuzzer.stderr.txt"))?, b"hello world!\n");
352 let data = fs::read(logs_dir.join("fuzzer.syslog.json"))?;
353 let logs_data: LogsData = serde_json::from_slice(&data)?;
354 assert_eq!(logs_data.msg(), Some("hello world!"));
355 Ok(())
356 }
357}