Skip to main content

fuchsia_fuzzctl_fdomain/
diagnostics.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::writer::{OutputSink, Writer};
6use anyhow::{Context as _, Error, Result, anyhow, bail};
7use diagnostics_data::LogsData;
8use flex_client::{self};
9use flex_fuchsia_fuzzer as fuzz;
10use futures::io::ReadHalf;
11use futures::{AsyncReadExt, try_join};
12use serde_json::Deserializer;
13use std::cell::RefCell;
14use std::path::Path;
15use std::rc::Rc;
16
17/// Compostion of `SocketForwarder`s for standard output, standard errors, and system logs.
18#[derive(Debug)]
19pub struct Forwarder<O: OutputSink> {
20    stdout: Option<SocketForwarder<O>>,
21    stderr: Option<SocketForwarder<O>>,
22    syslog: Option<SocketForwarder<O>>,
23    writer: Writer<O>,
24}
25
26impl<O: OutputSink> Forwarder<O> {
27    /// Creates a `Forwarder` that can forward data to the `writer`.
28    ///
29    /// Output will also be saved to the following files under the given `logs_dir` directory:
30    ///
31    ///   * fuzzer.stdout.txt
32    ///   * fuzzer.stderr.txt
33    ///   * fuzzer.syslog.json
34    ///
35    pub fn new(writer: &Writer<O>) -> Self {
36        Self { stdout: None, stderr: None, syslog: None, writer: writer.clone() }
37    }
38
39    /// Registers the provided output socket.
40    pub fn set_output<P: AsRef<Path>>(
41        &mut self,
42        socket: flex_client::Socket,
43        output: fuzz::TestOutput,
44        logs_dir: &Option<P>,
45    ) -> Result<()> {
46        match output {
47            fuzz::TestOutput::Stdout => {
48                let forwarder = self.create_forwarder(logs_dir, "stdout", "txt", socket)?;
49                self.stdout = Some(forwarder);
50            }
51            fuzz::TestOutput::Stderr => {
52                let forwarder = self.create_forwarder(logs_dir, "stderr", "txt", socket)?;
53                self.stderr = Some(forwarder);
54            }
55            fuzz::TestOutput::Syslog => {
56                let forwarder = self.create_forwarder(logs_dir, "syslog", "json", socket)?;
57                self.syslog = Some(forwarder);
58            }
59            _ => unreachable!(),
60        }
61        Ok(())
62    }
63
64    fn create_forwarder<P: AsRef<Path>>(
65        &self,
66        logs_dir: &Option<P>,
67        name: &str,
68        extension: &str,
69        socket: flex_client::Socket,
70    ) -> Result<SocketForwarder<O>> {
71        let writer = match logs_dir {
72            Some(logs_dir) => self
73                .writer
74                .tee(logs_dir, format!("fuzzer.{}.{}", name, extension))
75                .context(format!("failed to create file for {}", name)),
76            None => Ok(self.writer.clone()),
77        }?;
78        let forwarder = SocketForwarder::try_new(socket, &writer)
79            .context(format!("failed to create forwarder for {}", name))?;
80        Ok(forwarder)
81    }
82
83    /// Forwards output from the fuzzer to the `Writer` using each of the `SocketForwarder`s.
84    pub async fn forward_all(&self) -> Result<()> {
85        let stdout = self.stdout.clone();
86        let stdout_fut = || async move {
87            if let Some(stdout) = stdout {
88                stdout.forward_text("stdout").await.context("failed to forward stdout")?;
89            }
90            Ok::<(), Error>(())
91        };
92
93        let stderr = self.stderr.clone();
94        let stderr_fut = || async move {
95            if let Some(stderr) = stderr {
96                stderr.forward_text("stderr").await.context("failed to forward stderr")?;
97            }
98            Ok::<(), Error>(())
99        };
100
101        let syslog = self.syslog.clone();
102        let syslog_fut = || async move {
103            if let Some(syslog) = syslog {
104                syslog.forward_json("syslog").await.context("failed to forward syslog")?;
105            }
106            Ok::<(), Error>(())
107        };
108
109        try_join!(stdout_fut(), stderr_fut(), syslog_fut())?;
110        Ok(())
111    }
112}
113
114/// Forwarder for a single output stream.
115#[derive(Debug)]
116pub struct SocketForwarder<O: OutputSink> {
117    reader: Rc<RefCell<ReadHalf<flex_client::AsyncSocket>>>,
118    writer: Writer<O>,
119}
120
121impl<O: OutputSink> Clone for SocketForwarder<O> {
122    fn clone(&self) -> Self {
123        Self { reader: Rc::clone(&self.reader), writer: self.writer.clone() }
124    }
125}
126
127impl<O: OutputSink> SocketForwarder<O> {
128    /// Converts a a socket into a SocketForwarder.
129    ///
130    /// Returns an error if conversion to an async socket fails.
131    pub fn try_new(socket: flex_client::Socket, writer: &Writer<O>) -> Result<Self> {
132        let socket = flex_client::socket_to_async(socket);
133        let (reader, _) = socket.split();
134        Ok(Self { reader: Rc::new(RefCell::new(reader)), writer: writer.clone() })
135    }
136
137    /// Continuously forwards messages from the socket to the writer until the socket is closed.
138    pub async fn forward_text(&self, name: &str) -> Result<()> {
139        let mut reader = self.reader.borrow_mut();
140        let mut buf: [u8; 2048] = [0; 2048];
141        let mut raw = Vec::new();
142        let newline = '\n' as u8;
143        let done_marker = format!("{}\n", fuzz::DONE_MARKER);
144        let done_marker = done_marker.as_bytes();
145        loop {
146            match reader
147                .read(&mut buf)
148                .await
149                .context(format!("failed to read text data from {} socket", name))?
150            {
151                0 => {
152                    self.writer.write_all(&raw);
153                    bail!("{} from fuzzer ended prematurely", name);
154                }
155                num_read => raw.extend_from_slice(&buf[0..num_read]),
156            };
157            let data = raw;
158            raw = Vec::new();
159            for message in data.split_inclusive(|&x| x == newline) {
160                if message == done_marker {
161                    return Ok(());
162                } else if message.last() == Some(&newline) {
163                    self.writer.write_all(&message);
164                } else {
165                    raw = message.to_vec();
166                }
167            }
168        }
169    }
170
171    /// Continuously forwards JSON data from the socket to the writer until the socket is closed.
172    pub async fn forward_json(&self, name: &str) -> Result<()> {
173        let mut reader = self.reader.borrow_mut();
174        let mut buf: [u8; 2048] = [0; 2048];
175        let mut raw = Vec::new();
176        loop {
177            match reader
178                .read(&mut buf)
179                .await
180                .context(format!("failed to read JSON data from {} socket", name))?
181            {
182                0 => {
183                    self.writer.write_all(&raw);
184                    bail!("{} from fuzzer ended prematurely", name);
185                }
186                num_read => raw.extend_from_slice(&buf[0..num_read]),
187            };
188            let deserializer = Deserializer::from_slice(&raw);
189            let mut stream = deserializer.into_iter::<Vec<LogsData>>();
190            while let Some(items) = stream.next() {
191                let logs_data = match items {
192                    Err(e) if e.is_eof() => break,
193                    other => other,
194                }
195                .map_err(|e| anyhow!(format!("serde_json: {:?}", e)))
196                .context("failed to deserialize")?;
197                for log_data in logs_data.into_iter() {
198                    if let Some(message) = log_data.msg() {
199                        if message == fuzz::DONE_MARKER {
200                            return Ok(());
201                        }
202                    }
203                    self.writer.log(log_data);
204                }
205            }
206            let num_read = stream.byte_offset();
207            raw.drain(0..num_read);
208        }
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use anyhow::{Error, Result};
215    use diagnostics_data::LogsData;
216    use flex_fuchsia_fuzzer as fuzz;
217    use fuchsia_fuzzctl::{Forwarder, SocketForwarder};
218    use fuchsia_fuzzctl_test::{Test, send_log_entry};
219    use futures::{AsyncWriteExt, try_join};
220    use std::fs;
221
222    #[fuchsia::test]
223    async fn test_forward_text() -> Result<()> {
224        let mut test = Test::try_new()?;
225        let (tx, rx) = test.domain().create_stream_socket();
226        let forwarder = SocketForwarder::try_new(rx, test.writer())?;
227        let socket_fut = || async move {
228            let mut tx = flex_client::socket_to_async(tx);
229            tx.write_all(b"hello\nworld!\n").await?;
230            let done_marker = format!("{}\n", fuzz::DONE_MARKER);
231            tx.write_all(done_marker.as_bytes()).await?;
232            Ok::<(), Error>(())
233        };
234        test.output_matches("hello");
235        test.output_matches("world!");
236        try_join!(forwarder.forward_text("test"), socket_fut())?;
237        test.verify_output()
238    }
239
240    #[fuchsia::test]
241    async fn test_forward_json() -> Result<()> {
242        let mut test = Test::try_new()?;
243        let (tx, rx) = test.domain().create_stream_socket();
244        let forwarder = SocketForwarder::try_new(rx, test.writer())?;
245        let socket_fut = || async move {
246            let mut tx = flex_client::socket_to_async(tx);
247            send_log_entry(&mut tx, "hello world").await?;
248            send_log_entry(&mut tx, fuzz::DONE_MARKER).await?;
249            Ok::<(), Error>(())
250        };
251        try_join!(forwarder.forward_json("test"), socket_fut())?;
252        test.output_matches("[0.000][moniker][][I] hello world");
253        test.verify_output()
254    }
255
256    #[fuchsia::test]
257    async fn test_forward_all() -> Result<()> {
258        let mut test = Test::try_new()?;
259        let logs_dir = test.create_dir("logs")?;
260        let logs_dir = Some(logs_dir);
261        let mut forwarder = Forwarder::new(test.writer());
262
263        let (stdout_tx, stdout_rx) = test.domain().create_stream_socket();
264        forwarder.set_output(stdout_rx, fuzz::TestOutput::Stdout, &logs_dir)?;
265
266        let (stderr_tx, stderr_rx) = test.domain().create_stream_socket();
267        forwarder.set_output(stderr_rx, fuzz::TestOutput::Stderr, &logs_dir)?;
268
269        let (syslog_tx, syslog_rx) = test.domain().create_stream_socket();
270        forwarder.set_output(syslog_rx, fuzz::TestOutput::Syslog, &logs_dir)?;
271
272        let done_marker = format!("{}\n", fuzz::DONE_MARKER);
273        let done_marker_bytes = done_marker.as_bytes();
274
275        let a_done_marker = format!("a{}\n", fuzz::DONE_MARKER);
276        test.output_matches(a_done_marker.clone());
277
278        let done_marker_a = format!("{}a\n", fuzz::DONE_MARKER);
279        test.output_matches(done_marker_a.clone());
280
281        let socket_fut = || async move {
282            let mut stdout_tx = flex_client::socket_to_async(stdout_tx);
283            let mut stderr_tx = flex_client::socket_to_async(stderr_tx);
284            let mut syslog_tx = flex_client::socket_to_async(syslog_tx);
285
286            // Streams can be sent in any order
287            send_log_entry(&mut syslog_tx, fuzz::DONE_MARKER).await?;
288
289            // Data sent after the done marker should not be received.
290            stdout_tx.write_all(done_marker_bytes).await?;
291            stdout_tx.write_all(b"after\n").await?;
292
293            // Done marker must be exactly delimited by newlines, and can arrive in pieces.
294            stderr_tx.write_all(a_done_marker.as_bytes()).await?;
295            stderr_tx.write_all(done_marker_a.as_bytes()).await?;
296            for i in 0..done_marker_bytes.len() {
297                stderr_tx.write_all(&done_marker_bytes[i..i + 1]).await?;
298            }
299            stderr_tx.write_all(b"after\n").await?;
300            Ok::<(), Error>(())
301        };
302
303        try_join!(forwarder.forward_all(), socket_fut())?;
304        test.verify_output()
305    }
306
307    #[fuchsia::test]
308    async fn test_forward_to_file() -> Result<()> {
309        let test = Test::try_new()?;
310
311        let logs_dir = test.create_dir("logs")?;
312        let logs_dir = Some(logs_dir);
313        let mut forwarder = Forwarder::new(test.writer());
314
315        let (stdout_tx, stdout_rx) = test.domain().create_stream_socket();
316        forwarder.set_output(stdout_rx, fuzz::TestOutput::Stdout, &logs_dir)?;
317
318        let (stderr_tx, stderr_rx) = test.domain().create_stream_socket();
319        forwarder.set_output(stderr_rx, fuzz::TestOutput::Stderr, &logs_dir)?;
320
321        let (syslog_tx, syslog_rx) = test.domain().create_stream_socket();
322        forwarder.set_output(syslog_rx, fuzz::TestOutput::Syslog, &logs_dir)?;
323
324        let sockets_fut = || async move {
325            let done_marker = format!("{}\n", fuzz::DONE_MARKER);
326            let done_marker_bytes = done_marker.as_bytes();
327
328            // Write all in one shot.
329            let mut stdout_tx = flex_client::socket_to_async(stdout_tx);
330            stdout_tx.write_all(b"hello world!\n").await?;
331            stdout_tx.write_all(done_marker_bytes).await?;
332
333            // Write all in pieces.
334            let mut stderr_tx = flex_client::socket_to_async(stderr_tx);
335            stderr_tx.write_all(b"hel").await?;
336            stderr_tx.write_all(b"lo ").await?;
337            stderr_tx.write_all(b"wor").await?;
338            stderr_tx.write_all(b"ld!\n").await?;
339            stderr_tx.write_all(done_marker_bytes).await?;
340
341            // Write JSON. This should be made prettier when copying, e.g. newlines, spaces, etc.
342            let mut syslog_tx = flex_client::socket_to_async(syslog_tx);
343            send_log_entry(&mut syslog_tx, "hello world!").await?;
344            send_log_entry(&mut syslog_tx, fuzz::DONE_MARKER).await?;
345            Ok::<(), Error>(())
346        };
347        let sockets_fut = sockets_fut();
348        try_join!(forwarder.forward_all(), sockets_fut)?;
349        let logs_dir = logs_dir.unwrap();
350        assert_eq!(fs::read(logs_dir.join("fuzzer.stdout.txt"))?, b"hello world!\n");
351        assert_eq!(fs::read(logs_dir.join("fuzzer.stderr.txt"))?, b"hello world!\n");
352        let data = fs::read(logs_dir.join("fuzzer.syslog.json"))?;
353        let logs_data: LogsData = serde_json::from_slice(&data)?;
354        assert_eq!(logs_data.msg(), Some("hello world!"));
355        Ok(())
356    }
357}