1use fuchsia_async as fasync;
8use fuchsia_sync::Mutex;
9use futures::channel::mpsc;
10use futures::prelude::*;
11use futures::ready;
12use futures::task::{Context, Poll};
13use std::cell::RefCell;
14use std::io::Write;
15use std::pin::Pin;
16use std::sync::Arc;
17
18pub use crate::diagnostics::LogStream;
19
20mod diagnostics;
21pub mod zstd_compress;
22
23thread_local! {
24 static BUFFER: RefCell<Vec<u8>> = RefCell::new(vec![0; 1024*1024*2]);
25}
26
27pub struct SocketReadFut<'a, T, F>
29where
30 F: FnMut(Option<&[u8]>) -> Result<T, std::io::Error> + Unpin,
31{
32 socket: &'a mut flex_client::AsyncSocket,
33 on_read_fn: F,
34}
35
36impl<'a, T, F> SocketReadFut<'a, T, F>
37where
38 F: FnMut(Option<&[u8]>) -> Result<T, std::io::Error> + Unpin,
39{
40 pub fn new(socket: &'a mut flex_client::AsyncSocket, on_read_fn: F) -> Self {
41 Self { socket, on_read_fn }
42 }
43}
44
45impl<'a, T, F> Future for SocketReadFut<'a, T, F>
46where
47 F: FnMut(Option<&[u8]>) -> Result<T, std::io::Error> + Unpin,
48{
49 type Output = Result<T, std::io::Error>;
50 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
51 BUFFER.with(|b| {
52 let mut b = b.borrow_mut();
53 let mut_self = self.get_mut();
54 let len = ready!(Pin::new(&mut mut_self.socket).poll_read(cx, &mut *b)?);
55 match len {
56 0 => Poll::Ready((mut_self.on_read_fn)(None)),
57 l => Poll::Ready((mut_self.on_read_fn)(Some(&b[..l]))),
58 }
59 })
60 }
61}
62
63pub async fn collect_string_from_socket(
64 socket: flex_client::Socket,
65) -> Result<String, anyhow::Error> {
66 let (s, mut r) = mpsc::channel(1024);
67 let task = fasync::Task::spawn(collect_and_send_string_output(socket, s));
68 let mut ret = String::new();
69 while let Some(content) = r.next().await {
70 ret.push_str(&content);
71 }
72 task.await?;
73 Ok(ret)
74}
75
76pub async fn collect_and_send_string_output(
77 socket: flex_client::Socket,
78 mut sender: mpsc::Sender<String>,
79) -> Result<(), anyhow::Error> {
80 let mut async_socket = flex_client::socket_to_async(socket);
81 loop {
82 let maybe_string = SocketReadFut::new(&mut async_socket, |maybe_buf| {
83 Ok(maybe_buf.map(|buf| String::from_utf8_lossy(buf).into()))
84 })
85 .await?;
86 match maybe_string {
87 Some(string) => sender.send(string).await?,
88 None => return Ok(()),
89 }
90 }
91}
92
93pub struct StdoutBuffer<W: Write + Send + 'static> {
100 inner: Arc<Mutex<StdoutBufferInner<W>>>,
101 _timer: fuchsia_async::Task<()>,
102}
103
104impl<W: Write + Send + 'static> StdoutBuffer<W> {
105 pub fn new(duration: std::time::Duration, writer: W, max_capacity: usize) -> Self {
111 let (inner, timer) = StdoutBufferInner::new(duration, writer, max_capacity);
112 Self { inner, _timer: timer }
113 }
114}
115
116impl<W: Write + Send + 'static> Write for StdoutBuffer<W> {
117 fn flush(&mut self) -> Result<(), std::io::Error> {
118 self.inner.lock().flush()
119 }
120
121 fn write(&mut self, bytes: &[u8]) -> Result<usize, std::io::Error> {
122 self.inner.lock().write(bytes)
123 }
124}
125
126struct StdoutBufferInner<W: Write + Send + 'static> {
127 writer: W,
128 buffer: Option<Vec<u8>>,
130 stop_buffer_error: Option<std::io::Error>,
131 max_capacity: usize,
132}
133
134impl<W: Write + Send + 'static> StdoutBufferInner<W> {
135 fn new(
136 duration: std::time::Duration,
137 writer: W,
138 max_capacity: usize,
139 ) -> (Arc<Mutex<Self>>, fuchsia_async::Task<()>) {
140 let new_self = Arc::new(Mutex::new(StdoutBufferInner {
141 writer,
142 buffer: Some(Vec::with_capacity(max_capacity)),
143 stop_buffer_error: None,
144 max_capacity,
145 }));
146
147 let timer = fuchsia_async::Timer::new(duration);
148 let log_buffer = Arc::downgrade(&new_self);
149 let f = async move {
150 timer.await;
151 if let Some(log_buffer) = log_buffer.upgrade() {
152 log_buffer.lock().stop_buffering();
153 }
154 };
155
156 (new_self, fuchsia_async::Task::spawn(f))
157 }
158
159 fn stop_buffering(&mut self) {
160 if let Some(buf) = self.buffer.take() {
161 if let Err(e) = self.writer.write_all(&buf) {
162 self.stop_buffer_error = Some(e);
163 }
164 }
165 }
166}
167
168impl<W: Write + Send + 'static> Write for StdoutBufferInner<W> {
169 fn flush(&mut self) -> Result<(), std::io::Error> {
170 self.stop_buffering();
171 match self.stop_buffer_error.take() {
172 Some(e) => Err(e),
173 None => self.writer.flush(),
174 }
175 }
176
177 fn write(&mut self, bytes: &[u8]) -> Result<usize, std::io::Error> {
178 if let Some(e) = self.stop_buffer_error.take() {
179 return Err(e);
180 }
181 match self.buffer.as_mut() {
182 None => self.writer.write(bytes),
183 Some(buf) if buf.len() + bytes.len() > self.max_capacity => {
184 self.writer.write_all(&buf)?;
185 buf.truncate(0);
186 self.writer.write(bytes)
187 }
188 Some(buf) => Write::write(buf, bytes),
189 }
190 }
191}
192
193impl<W: Write + Send + 'static> Drop for StdoutBufferInner<W> {
194 fn drop(&mut self) {
195 let _ = self.flush();
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202 use futures::StreamExt;
203 use pretty_assertions::assert_eq;
204
205 async fn collect_until_eq<S: Stream<Item = String> + Unpin>(mut stream: S, target: &str) {
206 let mut accumulator = "".to_string();
207 while accumulator.len() < target.len() {
208 match stream.next().await {
209 Some(string) => accumulator.push_str(&string),
210 None => panic!(
211 "Expected string '{}' but stream terminated after '{}'",
212 target, accumulator
213 ),
214 }
215 }
216 assert_eq!(target, accumulator);
217 }
218
219 #[fuchsia_async::run_singlethreaded(test)]
220 async fn collect_test_stdout() {
221 #[cfg(feature = "fdomain")]
222 let client = fdomain_local::local_client_empty();
223 #[cfg(not(feature = "fdomain"))]
224 let client = flex_client::fidl::ZirconClient;
225 let (sock_server, sock_client) = client.create_stream_socket();
226 let mut sock_server = flex_client::socket_to_async(sock_server);
227
228 let (sender, mut recv) = mpsc::channel(1);
229
230 let fut =
231 fuchsia_async::Task::spawn(collect_and_send_string_output(sock_client, sender.into()));
232
233 sock_server.write_all(b"test message 1").await.expect("Can't write msg to socket");
234 sock_server.write_all(b"test message 2").await.expect("Can't write msg to socket");
235 sock_server.write_all(b"test message 3").await.expect("Can't write msg to socket");
236
237 collect_until_eq(&mut recv, "test message 1test message 2test message 3").await;
238
239 sock_server.write_all(b"test message 4").await.expect("Can't write msg to socket");
241 collect_until_eq(&mut recv, "test message 4").await;
242
243 sock_server.write_all(b"test message 5").await.expect("Can't write msg to socket");
245 std::mem::drop(sock_server); fut.await.expect("log collection should not fail");
247 collect_until_eq(&mut recv, "test message 5").await;
248
249 let msg = recv.next().await;
251 assert_eq!(msg, None);
252 }
253
254 #[cfg(target_os = "fuchsia")]
256 mod stdout {
257 use super::*;
258 use fuchsia_async::TestExecutor;
259
260 use pretty_assertions::assert_eq;
261 use std::ops::Add;
262
263 struct MutexBytes(Arc<Mutex<Vec<u8>>>);
264
265 impl Write for MutexBytes {
266 fn flush(&mut self) -> Result<(), std::io::Error> {
267 Write::flush(&mut *self.0.lock())
268 }
269
270 fn write(&mut self, bytes: &[u8]) -> Result<usize, std::io::Error> {
271 Write::write(&mut *self.0.lock(), bytes)
272 }
273 }
274
275 #[test]
276 fn log_buffer_without_timeout() {
277 let mut executor = TestExecutor::new_with_fake_time();
278 let output = Arc::new(Mutex::new(vec![]));
279 let writer = MutexBytes(output.clone());
280 let (log_buffer, mut timeout_task) =
281 StdoutBufferInner::new(std::time::Duration::from_secs(5), writer, 100);
282
283 write!(log_buffer.lock(), "message1").expect("write message");
284 assert_eq!(*output.lock(), b"");
285 write!(log_buffer.lock(), "message2").expect("write message");
286 assert_eq!(*output.lock(), b"");
287
288 assert_eq!(executor.run_until_stalled(&mut timeout_task), Poll::Pending);
289 assert_eq!(*output.lock(), b"");
290
291 log_buffer.lock().flush().expect("flush buffer");
292 assert_eq!(*output.lock(), b"message1message2");
293 }
294
295 #[test]
296 fn log_buffer_flush_on_drop() {
297 let mut executor = TestExecutor::new_with_fake_time();
298 let output = Arc::new(Mutex::new(vec![]));
299 let writer = MutexBytes(output.clone());
300 let (log_buffer, mut timeout_task) =
301 StdoutBufferInner::new(std::time::Duration::from_secs(5), writer, 100);
302
303 write!(log_buffer.lock(), "message1").expect("write message");
304 assert_eq!(*output.lock(), b"");
305 write!(log_buffer.lock(), "message2").expect("write message");
306 assert_eq!(*output.lock(), b"");
307
308 assert_eq!(executor.run_until_stalled(&mut timeout_task), Poll::Pending);
309 assert_eq!(*output.lock(), b"");
310
311 drop(log_buffer);
312 assert_eq!(*output.lock(), b"message1message2");
313 }
314
315 #[test]
316 fn log_buffer_with_timeout() {
317 let mut executor = TestExecutor::new_with_fake_time();
318 let output = Arc::new(Mutex::new(vec![]));
319 let writer = MutexBytes(output.clone());
320 let (log_buffer, mut timeout_task) =
321 StdoutBufferInner::new(std::time::Duration::from_secs(5), writer, 100);
322
323 write!(log_buffer.lock(), "message1").expect("write message");
324 assert_eq!(*output.lock(), b"");
325 write!(log_buffer.lock(), "message2").expect("write message");
326 assert_eq!(*output.lock(), b"");
327
328 assert_eq!(executor.run_until_stalled(&mut timeout_task), Poll::Pending);
329 assert_eq!(*output.lock(), b"");
330
331 executor.set_fake_time(executor.now().add(zx::MonotonicDuration::from_seconds(6)));
332 executor.wake_next_timer();
333 assert_eq!(executor.run_until_stalled(&mut timeout_task), Poll::Ready(()));
334 assert_eq!(*output.lock(), b"message1message2");
335 }
336
337 #[test]
338 fn log_buffer_capacity_reached() {
339 let _executor = TestExecutor::new_with_fake_time();
340 let output = Arc::new(Mutex::new(vec![]));
341 let writer = MutexBytes(output.clone());
342 let (log_buffer, _timeout_task) =
343 StdoutBufferInner::new(std::time::Duration::from_secs(5), writer, 10);
344
345 write!(log_buffer.lock(), "message1").expect("write message");
346 assert_eq!(*output.lock(), b"");
347 write!(log_buffer.lock(), "message2").expect("write message");
348 assert_eq!(*output.lock(), b"message1message2");
349
350 write!(log_buffer.lock(), "message1").expect("write message");
352 assert_eq!(*output.lock(), b"message1message2");
353 write!(log_buffer.lock(), "message2").expect("write message");
354 assert_eq!(*output.lock(), b"message1message2message1message2");
355 }
356 }
357}