1use crate::writer::{OutputSink, Writer};
6use anyhow::{Context as _, Error, Result, anyhow, bail};
7use diagnostics_data::LogsData;
8use flex_client::{self};
9use flex_fuchsia_fuzzer as fuzz;
10use futures::io::ReadHalf;
11use futures::{AsyncReadExt, try_join};
12use serde_json::Deserializer;
13use std::cell::RefCell;
14use std::path::Path;
15use std::rc::Rc;
16
17#[derive(Debug)]
19pub struct Forwarder<O: OutputSink> {
20 stdout: Option<SocketForwarder<O>>,
21 stderr: Option<SocketForwarder<O>>,
22 syslog: Option<SocketForwarder<O>>,
23 writer: Writer<O>,
24}
25
26impl<O: OutputSink> Forwarder<O> {
27 pub fn new(writer: &Writer<O>) -> Self {
36 Self { stdout: None, stderr: None, syslog: None, writer: writer.clone() }
37 }
38
39 pub fn set_output<P: AsRef<Path>>(
41 &mut self,
42 socket: flex_client::Socket,
43 output: fuzz::TestOutput,
44 logs_dir: &Option<P>,
45 ) -> Result<()> {
46 match output {
47 fuzz::TestOutput::Stdout => {
48 let forwarder = self.create_forwarder(logs_dir, "stdout", "txt", socket)?;
49 self.stdout = Some(forwarder);
50 }
51 fuzz::TestOutput::Stderr => {
52 let forwarder = self.create_forwarder(logs_dir, "stderr", "txt", socket)?;
53 self.stderr = Some(forwarder);
54 }
55 fuzz::TestOutput::Syslog => {
56 let forwarder = self.create_forwarder(logs_dir, "syslog", "json", socket)?;
57 self.syslog = Some(forwarder);
58 }
59 _ => unreachable!(),
60 }
61 Ok(())
62 }
63
64 fn create_forwarder<P: AsRef<Path>>(
65 &self,
66 logs_dir: &Option<P>,
67 name: &str,
68 extension: &str,
69 socket: flex_client::Socket,
70 ) -> Result<SocketForwarder<O>> {
71 let writer = match logs_dir {
72 Some(logs_dir) => self
73 .writer
74 .tee(logs_dir, format!("fuzzer.{}.{}", name, extension))
75 .context(format!("failed to create file for {}", name)),
76 None => Ok(self.writer.clone()),
77 }?;
78 let forwarder = SocketForwarder::try_new(socket, &writer)
79 .context(format!("failed to create forwarder for {}", name))?;
80 Ok(forwarder)
81 }
82
83 pub async fn forward_all(&self) -> Result<()> {
85 let stdout = self.stdout.clone();
86 let stdout_fut = || async move {
87 if let Some(stdout) = stdout {
88 stdout.forward_text("stdout").await.context("failed to forward stdout")?;
89 }
90 Ok::<(), Error>(())
91 };
92
93 let stderr = self.stderr.clone();
94 let stderr_fut = || async move {
95 if let Some(stderr) = stderr {
96 stderr.forward_text("stderr").await.context("failed to forward stderr")?;
97 }
98 Ok::<(), Error>(())
99 };
100
101 let syslog = self.syslog.clone();
102 let syslog_fut = || async move {
103 if let Some(syslog) = syslog {
104 syslog.forward_json("syslog").await.context("failed to forward syslog")?;
105 }
106 Ok::<(), Error>(())
107 };
108
109 try_join!(stdout_fut(), stderr_fut(), syslog_fut())?;
110 Ok(())
111 }
112}
113
114#[derive(Debug)]
116pub struct SocketForwarder<O: OutputSink> {
117 reader: Rc<RefCell<ReadHalf<flex_client::AsyncSocket>>>,
118 writer: Writer<O>,
119}
120
121impl<O: OutputSink> Clone for SocketForwarder<O> {
122 fn clone(&self) -> Self {
123 Self { reader: Rc::clone(&self.reader), writer: self.writer.clone() }
124 }
125}
126
127impl<O: OutputSink> SocketForwarder<O> {
128 pub fn try_new(socket: flex_client::Socket, writer: &Writer<O>) -> Result<Self> {
132 let socket = flex_client::socket_to_async(socket);
133 let (reader, _) = socket.split();
134 Ok(Self { reader: Rc::new(RefCell::new(reader)), writer: writer.clone() })
135 }
136
137 pub async fn forward_text(&self, name: &str) -> Result<()> {
139 let mut reader = self.reader.borrow_mut();
140 let mut buf: [u8; 2048] = [0; 2048];
141 let mut raw = Vec::new();
142 let newline = '\n' as u8;
143 let done_marker = format!("{}\n", fuzz::DONE_MARKER);
144 let done_marker = done_marker.as_bytes();
145 loop {
146 match reader
147 .read(&mut buf)
148 .await
149 .context(format!("failed to read text data from {} socket", name))?
150 {
151 0 => {
152 self.writer.write_all(&raw);
153 bail!("{} from fuzzer ended prematurely", name);
154 }
155 num_read => raw.extend_from_slice(&buf[0..num_read]),
156 };
157 let data = raw;
158 raw = Vec::new();
159 for message in data.split_inclusive(|&x| x == newline) {
160 if message == done_marker {
161 return Ok(());
162 } else if message.last() == Some(&newline) {
163 self.writer.write_all(&message);
164 } else {
165 raw = message.to_vec();
166 }
167 }
168 }
169 }
170
171 pub async fn forward_json(&self, name: &str) -> Result<()> {
173 let mut reader = self.reader.borrow_mut();
174 let mut buf: [u8; 2048] = [0; 2048];
175 let mut raw = Vec::new();
176 loop {
177 match reader
178 .read(&mut buf)
179 .await
180 .context(format!("failed to read JSON data from {} socket", name))?
181 {
182 0 => {
183 self.writer.write_all(&raw);
184 bail!("{} from fuzzer ended prematurely", name);
185 }
186 num_read => raw.extend_from_slice(&buf[0..num_read]),
187 };
188 let deserializer = Deserializer::from_slice(&raw);
189 let mut stream = deserializer.into_iter::<Vec<LogsData>>();
190 while let Some(items) = stream.next() {
191 let logs_data = match items {
192 Err(e) if e.is_eof() => break,
193 other => other,
194 }
195 .map_err(|e| anyhow!(format!("serde_json: {:?}", e)))
196 .context("failed to deserialize")?;
197 for log_data in logs_data.into_iter() {
198 if let Some(message) = log_data.msg() {
199 if message == fuzz::DONE_MARKER {
200 return Ok(());
201 }
202 }
203 self.writer.log(log_data);
204 }
205 }
206 let num_read = stream.byte_offset();
207 raw.drain(0..num_read);
208 }
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use anyhow::{Error, Result};
215 use diagnostics_data::LogsData;
216 use flex_fuchsia_fuzzer as fuzz;
217 use fuchsia_fuzzctl::{Forwarder, SocketForwarder};
218 use fuchsia_fuzzctl_test::{Test, send_log_entry};
219 use futures::{AsyncWriteExt, try_join};
220 use std::fs;
221
222 #[fuchsia::test]
223 async fn test_forward_text() -> Result<()> {
224 let mut test = Test::try_new()?;
225 let (tx, rx) = test.domain().create_stream_socket();
226 let forwarder = SocketForwarder::try_new(rx, test.writer())?;
227 let socket_fut = || async move {
228 let mut tx = flex_client::socket_to_async(tx);
229 tx.write_all(b"hello\nworld!\n").await?;
230 let done_marker = format!("{}\n", fuzz::DONE_MARKER);
231 tx.write_all(done_marker.as_bytes()).await?;
232 Ok::<(), Error>(())
233 };
234 test.output_matches("hello");
235 test.output_matches("world!");
236 try_join!(forwarder.forward_text("test"), socket_fut())?;
237 test.verify_output()
238 }
239
240 #[fuchsia::test]
241 async fn test_forward_json() -> Result<()> {
242 let mut test = Test::try_new()?;
243 let (tx, rx) = test.domain().create_stream_socket();
244 let forwarder = SocketForwarder::try_new(rx, test.writer())?;
245 let socket_fut = || async move {
246 let mut tx = flex_client::socket_to_async(tx);
247 send_log_entry(&mut tx, "hello world").await?;
248 send_log_entry(&mut tx, fuzz::DONE_MARKER).await?;
249 Ok::<(), Error>(())
250 };
251 try_join!(forwarder.forward_json("test"), socket_fut())?;
252 test.output_matches("[0.000][moniker][][I] hello world");
253 test.verify_output()
254 }
255
256 #[fuchsia::test]
257 async fn test_forward_all() -> Result<()> {
258 let mut test = Test::try_new()?;
259 let logs_dir = test.create_dir("logs")?;
260 let logs_dir = Some(logs_dir);
261 let mut forwarder = Forwarder::new(test.writer());
262
263 let (stdout_tx, stdout_rx) = test.domain().create_stream_socket();
264 forwarder.set_output(stdout_rx, fuzz::TestOutput::Stdout, &logs_dir)?;
265
266 let (stderr_tx, stderr_rx) = test.domain().create_stream_socket();
267 forwarder.set_output(stderr_rx, fuzz::TestOutput::Stderr, &logs_dir)?;
268
269 let (syslog_tx, syslog_rx) = test.domain().create_stream_socket();
270 forwarder.set_output(syslog_rx, fuzz::TestOutput::Syslog, &logs_dir)?;
271
272 let done_marker = format!("{}\n", fuzz::DONE_MARKER);
273 let done_marker_bytes = done_marker.as_bytes();
274
275 let a_done_marker = format!("a{}\n", fuzz::DONE_MARKER);
276 test.output_matches(a_done_marker.clone());
277
278 let done_marker_a = format!("{}a\n", fuzz::DONE_MARKER);
279 test.output_matches(done_marker_a.clone());
280
281 let socket_fut = || async move {
282 let mut stdout_tx = flex_client::socket_to_async(stdout_tx);
283 let mut stderr_tx = flex_client::socket_to_async(stderr_tx);
284 let mut syslog_tx = flex_client::socket_to_async(syslog_tx);
285
286 send_log_entry(&mut syslog_tx, fuzz::DONE_MARKER).await?;
288
289 stdout_tx.write_all(done_marker_bytes).await?;
291 stdout_tx.write_all(b"after\n").await?;
292
293 stderr_tx.write_all(a_done_marker.as_bytes()).await?;
295 stderr_tx.write_all(done_marker_a.as_bytes()).await?;
296 for i in 0..done_marker_bytes.len() {
297 stderr_tx.write_all(&done_marker_bytes[i..i + 1]).await?;
298 }
299 stderr_tx.write_all(b"after\n").await?;
300 Ok::<(), Error>(())
301 };
302
303 try_join!(forwarder.forward_all(), socket_fut())?;
304 test.verify_output()
305 }
306
307 #[fuchsia::test]
308 async fn test_forward_to_file() -> Result<()> {
309 let test = Test::try_new()?;
310
311 let logs_dir = test.create_dir("logs")?;
312 let logs_dir = Some(logs_dir);
313 let mut forwarder = Forwarder::new(test.writer());
314
315 let (stdout_tx, stdout_rx) = test.domain().create_stream_socket();
316 forwarder.set_output(stdout_rx, fuzz::TestOutput::Stdout, &logs_dir)?;
317
318 let (stderr_tx, stderr_rx) = test.domain().create_stream_socket();
319 forwarder.set_output(stderr_rx, fuzz::TestOutput::Stderr, &logs_dir)?;
320
321 let (syslog_tx, syslog_rx) = test.domain().create_stream_socket();
322 forwarder.set_output(syslog_rx, fuzz::TestOutput::Syslog, &logs_dir)?;
323
324 let sockets_fut = || async move {
325 let done_marker = format!("{}\n", fuzz::DONE_MARKER);
326 let done_marker_bytes = done_marker.as_bytes();
327
328 let mut stdout_tx = flex_client::socket_to_async(stdout_tx);
330 stdout_tx.write_all(b"hello world!\n").await?;
331 stdout_tx.write_all(done_marker_bytes).await?;
332
333 let mut stderr_tx = flex_client::socket_to_async(stderr_tx);
335 stderr_tx.write_all(b"hel").await?;
336 stderr_tx.write_all(b"lo ").await?;
337 stderr_tx.write_all(b"wor").await?;
338 stderr_tx.write_all(b"ld!\n").await?;
339 stderr_tx.write_all(done_marker_bytes).await?;
340
341 let mut syslog_tx = flex_client::socket_to_async(syslog_tx);
343 send_log_entry(&mut syslog_tx, "hello world!").await?;
344 send_log_entry(&mut syslog_tx, fuzz::DONE_MARKER).await?;
345 Ok::<(), Error>(())
346 };
347 let sockets_fut = sockets_fut();
348 try_join!(forwarder.forward_all(), sockets_fut)?;
349 let logs_dir = logs_dir.unwrap();
350 assert_eq!(fs::read(logs_dir.join("fuzzer.stdout.txt"))?, b"hello world!\n");
351 assert_eq!(fs::read(logs_dir.join("fuzzer.stderr.txt"))?, b"hello world!\n");
352 let data = fs::read(logs_dir.join("fuzzer.syslog.json"))?;
353 let logs_data: LogsData = serde_json::from_slice(&data)?;
354 assert_eq!(logs_data.msg(), Some("hello world!"));
355 Ok(())
356 }
357}