fuchsia_fuzzctl/
diagnostics.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::writer::{OutputSink, Writer};
6use anyhow::{anyhow, bail, Context as _, Error, Result};
7use diagnostics_data::LogsData;
8use fidl_fuchsia_fuzzer as fuzz;
9use futures::io::ReadHalf;
10use futures::{try_join, AsyncReadExt};
11use serde_json::Deserializer;
12use std::cell::RefCell;
13use std::path::Path;
14use std::rc::Rc;
15
16/// Compostion of `SocketForwarder`s for standard output, standard errors, and system logs.
17#[derive(Debug)]
18pub struct Forwarder<O: OutputSink> {
19    stdout: Option<SocketForwarder<O>>,
20    stderr: Option<SocketForwarder<O>>,
21    syslog: Option<SocketForwarder<O>>,
22    writer: Writer<O>,
23}
24
25impl<O: OutputSink> Forwarder<O> {
26    /// Creates a `Forwarder` that can forward data to the `writer`.
27    ///
28    /// Output will also be saved to the following files under the given `logs_dir` directory:
29    ///
30    ///   * fuzzer.stdout.txt
31    ///   * fuzzer.stderr.txt
32    ///   * fuzzer.syslog.json
33    ///
34    pub fn new(writer: &Writer<O>) -> Self {
35        Self { stdout: None, stderr: None, syslog: None, writer: writer.clone() }
36    }
37
38    /// Registers the provided output socket.
39    pub fn set_output<P: AsRef<Path>>(
40        &mut self,
41        socket: fidl::Socket,
42        output: fuzz::TestOutput,
43        logs_dir: &Option<P>,
44    ) -> Result<()> {
45        match output {
46            fuzz::TestOutput::Stdout => {
47                let forwarder = self.create_forwarder(logs_dir, "stdout", "txt", socket)?;
48                self.stdout = Some(forwarder);
49            }
50            fuzz::TestOutput::Stderr => {
51                let forwarder = self.create_forwarder(logs_dir, "stderr", "txt", socket)?;
52                self.stderr = Some(forwarder);
53            }
54            fuzz::TestOutput::Syslog => {
55                let forwarder = self.create_forwarder(logs_dir, "syslog", "json", socket)?;
56                self.syslog = Some(forwarder);
57            }
58            _ => unreachable!(),
59        }
60        Ok(())
61    }
62
63    fn create_forwarder<P: AsRef<Path>>(
64        &self,
65        logs_dir: &Option<P>,
66        name: &str,
67        extension: &str,
68        socket: fidl::Socket,
69    ) -> Result<SocketForwarder<O>> {
70        let writer = match logs_dir {
71            Some(logs_dir) => self
72                .writer
73                .tee(logs_dir, format!("fuzzer.{}.{}", name, extension))
74                .context(format!("failed to create file for {}", name)),
75            None => Ok(self.writer.clone()),
76        }?;
77        let forwarder = SocketForwarder::try_new(socket, &writer)
78            .context(format!("failed to create forwarder for {}", name))?;
79        Ok(forwarder)
80    }
81
82    /// Forwards output from the fuzzer to the `Writer` using each of the `SocketForwarder`s.
83    pub async fn forward_all(&self) -> Result<()> {
84        let stdout = self.stdout.clone();
85        let stdout_fut = || async move {
86            if let Some(stdout) = stdout {
87                stdout.forward_text("stdout").await.context("failed to forward stdout")?;
88            }
89            Ok::<(), Error>(())
90        };
91
92        let stderr = self.stderr.clone();
93        let stderr_fut = || async move {
94            if let Some(stderr) = stderr {
95                stderr.forward_text("stderr").await.context("failed to forward stderr")?;
96            }
97            Ok::<(), Error>(())
98        };
99
100        let syslog = self.syslog.clone();
101        let syslog_fut = || async move {
102            if let Some(syslog) = syslog {
103                syslog.forward_json("syslog").await.context("failed to forward syslog")?;
104            }
105            Ok::<(), Error>(())
106        };
107
108        try_join!(stdout_fut(), stderr_fut(), syslog_fut())?;
109        Ok(())
110    }
111}
112
113/// Forwarder for a single output stream.
114#[derive(Debug)]
115pub struct SocketForwarder<O: OutputSink> {
116    reader: Rc<RefCell<ReadHalf<fidl::AsyncSocket>>>,
117    writer: Writer<O>,
118}
119
120impl<O: OutputSink> Clone for SocketForwarder<O> {
121    fn clone(&self) -> Self {
122        Self { reader: Rc::clone(&self.reader), writer: self.writer.clone() }
123    }
124}
125
126impl<O: OutputSink> SocketForwarder<O> {
127    /// Converts a a socket into a SocketForwarder.
128    ///
129    /// Returns an error if conversion to an async socket fails.
130    pub fn try_new(socket: fidl::Socket, writer: &Writer<O>) -> Result<Self> {
131        let socket = fidl::AsyncSocket::from_socket(socket);
132        let (reader, _) = socket.split();
133        Ok(Self { reader: Rc::new(RefCell::new(reader)), writer: writer.clone() })
134    }
135
136    /// Continuously forwards messages from the socket to the writer until the socket is closed.
137    pub async fn forward_text(&self, name: &str) -> Result<()> {
138        let mut reader = self.reader.borrow_mut();
139        let mut buf: [u8; 2048] = [0; 2048];
140        let mut raw = Vec::new();
141        let newline = '\n' as u8;
142        let done_marker = format!("{}\n", fuzz::DONE_MARKER);
143        let done_marker = done_marker.as_bytes();
144        loop {
145            match reader
146                .read(&mut buf)
147                .await
148                .context(format!("failed to read text data from {} socket", name))?
149            {
150                0 => {
151                    self.writer.write_all(&raw);
152                    bail!("{} from fuzzer ended prematurely", name);
153                }
154                num_read => raw.extend_from_slice(&buf[0..num_read]),
155            };
156            let data = raw;
157            raw = Vec::new();
158            for message in data.split_inclusive(|&x| x == newline) {
159                if message == done_marker {
160                    return Ok(());
161                } else if message.last() == Some(&newline) {
162                    self.writer.write_all(&message);
163                } else {
164                    raw = message.to_vec();
165                }
166            }
167        }
168    }
169
170    /// Continuously forwards JSON data from the socket to the writer until the socket is closed.
171    pub async fn forward_json(&self, name: &str) -> Result<()> {
172        let mut reader = self.reader.borrow_mut();
173        let mut buf: [u8; 2048] = [0; 2048];
174        let mut raw = Vec::new();
175        loop {
176            match reader
177                .read(&mut buf)
178                .await
179                .context(format!("failed to read JSON data from {} socket", name))?
180            {
181                0 => {
182                    self.writer.write_all(&raw);
183                    bail!("{} from fuzzer ended prematurely", name);
184                }
185                num_read => raw.extend_from_slice(&buf[0..num_read]),
186            };
187            let deserializer = Deserializer::from_slice(&raw);
188            let mut stream = deserializer.into_iter::<Vec<LogsData>>();
189            while let Some(items) = stream.next() {
190                let logs_data = match items {
191                    Err(e) if e.is_eof() => break,
192                    other => other,
193                }
194                .map_err(|e| anyhow!(format!("serde_json: {:?}", e)))
195                .context("failed to deserialize")?;
196                for log_data in logs_data.into_iter() {
197                    if let Some(message) = log_data.msg() {
198                        if message == fuzz::DONE_MARKER {
199                            return Ok(());
200                        }
201                    }
202                    self.writer.log(log_data);
203                }
204            }
205            let num_read = stream.byte_offset();
206            raw.drain(0..num_read);
207        }
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use anyhow::{Error, Result};
214    use diagnostics_data::LogsData;
215    use fidl::Socket;
216    use fidl_fuchsia_fuzzer as fuzz;
217    use fuchsia_fuzzctl::{Forwarder, SocketForwarder};
218    use fuchsia_fuzzctl_test::{send_log_entry, Test};
219    use futures::{try_join, AsyncWriteExt};
220    use std::fs;
221
222    #[fuchsia::test]
223    async fn test_forward_text() -> Result<()> {
224        let mut test = Test::try_new()?;
225        let (tx, rx) = Socket::create_stream();
226        let forwarder = SocketForwarder::try_new(rx, test.writer())?;
227        let socket_fut = || async move {
228            let mut tx = fidl::AsyncSocket::from_socket(tx);
229            tx.write_all(b"hello\nworld!\n").await?;
230            let done_marker = format!("{}\n", fuzz::DONE_MARKER);
231            tx.write_all(done_marker.as_bytes()).await?;
232            Ok::<(), Error>(())
233        };
234        test.output_matches("hello");
235        test.output_matches("world!");
236        try_join!(forwarder.forward_text("test"), socket_fut())?;
237        test.verify_output()
238    }
239
240    #[fuchsia::test]
241    async fn test_forward_json() -> Result<()> {
242        let mut test = Test::try_new()?;
243        let (tx, rx) = Socket::create_stream();
244        let forwarder = SocketForwarder::try_new(rx, test.writer())?;
245        let socket_fut = || async move {
246            let mut tx = fidl::AsyncSocket::from_socket(tx);
247            send_log_entry(&mut tx, "hello world").await?;
248            send_log_entry(&mut tx, fuzz::DONE_MARKER).await?;
249            Ok::<(), Error>(())
250        };
251        try_join!(forwarder.forward_json("test"), socket_fut())?;
252        test.output_matches("[0.000][moniker][][I] hello world");
253        test.verify_output()
254    }
255
256    #[fuchsia::test]
257    async fn test_forward_all() -> Result<()> {
258        let mut test = Test::try_new()?;
259        let logs_dir = test.create_dir("logs")?;
260        let logs_dir = Some(logs_dir);
261        let mut forwarder = Forwarder::new(test.writer());
262
263        let (stdout_tx, stdout_rx) = Socket::create_stream();
264        forwarder.set_output(stdout_rx, fuzz::TestOutput::Stdout, &logs_dir)?;
265
266        let (stderr_tx, stderr_rx) = Socket::create_stream();
267        forwarder.set_output(stderr_rx, fuzz::TestOutput::Stderr, &logs_dir)?;
268
269        let (syslog_tx, syslog_rx) = Socket::create_stream();
270        forwarder.set_output(syslog_rx, fuzz::TestOutput::Syslog, &logs_dir)?;
271
272        let done_marker = format!("{}\n", fuzz::DONE_MARKER);
273        let done_marker_bytes = done_marker.as_bytes();
274
275        let a_done_marker = format!("a{}\n", fuzz::DONE_MARKER);
276        test.output_matches(a_done_marker.clone());
277
278        let done_marker_a = format!("{}a\n", fuzz::DONE_MARKER);
279        test.output_matches(done_marker_a.clone());
280
281        let socket_fut = || async move {
282            let mut stdout_tx = fidl::AsyncSocket::from_socket(stdout_tx);
283            let mut stderr_tx = fidl::AsyncSocket::from_socket(stderr_tx);
284            let mut syslog_tx = fidl::AsyncSocket::from_socket(syslog_tx);
285
286            // Streams can be sent in any order
287            send_log_entry(&mut syslog_tx, fuzz::DONE_MARKER).await?;
288
289            // Data sent after the done marker should not be received.
290            stdout_tx.write_all(done_marker_bytes).await?;
291            stdout_tx.write_all(b"after\n").await?;
292
293            // Done marker must be exactly delimited by newlines, and can arrive in pieces.
294            stderr_tx.write_all(a_done_marker.as_bytes()).await?;
295            stderr_tx.write_all(done_marker_a.as_bytes()).await?;
296            for i in 0..done_marker_bytes.len() {
297                stderr_tx.write_all(&done_marker_bytes[i..i + 1]).await?;
298            }
299            stderr_tx.write_all(b"after\n").await?;
300            Ok::<(), Error>(())
301        };
302
303        try_join!(forwarder.forward_all(), socket_fut())?;
304        test.verify_output()
305    }
306
307    #[fuchsia::test]
308    async fn test_forward_to_file() -> Result<()> {
309        let test = Test::try_new()?;
310
311        let logs_dir = test.create_dir("logs")?;
312        let logs_dir = Some(logs_dir);
313        let mut forwarder = Forwarder::new(test.writer());
314
315        let (stdout_tx, stdout_rx) = Socket::create_stream();
316        forwarder.set_output(stdout_rx, fuzz::TestOutput::Stdout, &logs_dir)?;
317
318        let (stderr_tx, stderr_rx) = Socket::create_stream();
319        forwarder.set_output(stderr_rx, fuzz::TestOutput::Stderr, &logs_dir)?;
320
321        let (syslog_tx, syslog_rx) = Socket::create_stream();
322        forwarder.set_output(syslog_rx, fuzz::TestOutput::Syslog, &logs_dir)?;
323
324        let sockets_fut = || async move {
325            let done_marker = format!("{}\n", fuzz::DONE_MARKER);
326            let done_marker_bytes = done_marker.as_bytes();
327
328            // Write all in one shot.
329            let mut stdout_tx = fidl::AsyncSocket::from_socket(stdout_tx);
330            stdout_tx.write_all(b"hello world!\n").await?;
331            stdout_tx.write_all(done_marker_bytes).await?;
332
333            // Write all in pieces.
334            let mut stderr_tx = fidl::AsyncSocket::from_socket(stderr_tx);
335            stderr_tx.write_all(b"hel").await?;
336            stderr_tx.write_all(b"lo ").await?;
337            stderr_tx.write_all(b"wor").await?;
338            stderr_tx.write_all(b"ld!\n").await?;
339            stderr_tx.write_all(done_marker_bytes).await?;
340
341            // Write JSON. This should be made prettier when copying, e.g. newlines, spaces, etc.
342            let mut syslog_tx = fidl::AsyncSocket::from_socket(syslog_tx);
343            send_log_entry(&mut syslog_tx, "hello world!").await?;
344            send_log_entry(&mut syslog_tx, fuzz::DONE_MARKER).await?;
345            Ok::<(), Error>(())
346        };
347        let sockets_fut = sockets_fut();
348        try_join!(forwarder.forward_all(), sockets_fut)?;
349        let logs_dir = logs_dir.unwrap();
350        assert_eq!(fs::read(logs_dir.join("fuzzer.stdout.txt"))?, b"hello world!\n");
351        assert_eq!(fs::read(logs_dir.join("fuzzer.stderr.txt"))?, b"hello world!\n");
352        let data = fs::read(logs_dir.join("fuzzer.syslog.json"))?;
353        let logs_data: LogsData = serde_json::from_slice(&data)?;
354        assert_eq!(logs_data.msg(), Some("hello world!"));
355        Ok(())
356    }
357}