1use fuchsia_async as fasync;
8use futures::{AsyncReadExt as _, AsyncWriteExt as _, FutureExt as _, future};
9use std::num::NonZeroUsize;
10use thiserror::Error;
11
12const SOCKET_BUFFER_SIZE: usize = 2048;
14
15const MAX_LINE_BUFFER_LENGTH: usize = 4096;
18
19#[derive(Debug, PartialEq, Eq, Error, Clone)]
21pub enum LoggerError {
22 #[error("cannot create socket: {:?}", _0)]
23 CreateSocket(zx::Status),
24
25 #[error("cannot duplicate socket: {:?}", _0)]
26 DuplicateSocket(zx::Status),
27
28 #[error("invalid socket: {:?}", _0)]
29 InvalidSocket(zx::Status),
30}
31
32#[derive(Debug, Error)]
34pub enum LogError {
35 #[error("can't get logs: {:?}", _0)]
37 Read(std::io::Error),
38
39 #[error("can't write logs: {:?}", _0)]
41 Write(std::io::Error),
42}
43
44pub fn create_std_combined_log_stream()
47-> Result<(LoggerStream, zx::NullableHandle, zx::NullableHandle), LoggerError> {
48 let (client, log) = zx::Socket::create_stream();
49
50 let stream = LoggerStream::new(client).map_err(LoggerError::InvalidSocket)?;
51 let clone =
52 log.duplicate_handle(zx::Rights::SAME_RIGHTS).map_err(LoggerError::DuplicateSocket)?;
53
54 Ok((stream, log.into_handle(), clone.into_handle()))
55}
56
57pub fn create_log_stream() -> Result<(LoggerStream, zx::NullableHandle), LoggerError> {
60 let (client, log) = zx::Socket::create_stream();
61
62 let stream = LoggerStream::new(client).map_err(LoggerError::InvalidSocket)?;
63
64 Ok((stream, log.into_handle()))
65}
66pub struct LogStreamReader {
68 fut: future::RemoteHandle<Result<Vec<u8>, LogError>>,
69}
70
71impl LogStreamReader {
72 pub fn new(logger: LoggerStream) -> Self {
73 let (logger_handle, logger_fut) = logger.read_to_end().remote_handle();
74 fasync::Task::spawn(logger_handle).detach();
75 Self { fut: logger_fut }
76 }
77
78 pub async fn get_logs(self) -> Result<Vec<u8>, LogError> {
80 self.fut.await
81 }
82}
83
84pub struct LoggerStream {
88 socket: fasync::Socket,
89}
90
91impl Unpin for LoggerStream {}
92
93impl LoggerStream {
94 pub fn new(socket: zx::Socket) -> Result<LoggerStream, zx::Status> {
97 let l = LoggerStream { socket: fasync::Socket::from_socket(socket) };
98 Ok(l)
99 }
100
101 pub async fn read_to_end(mut self) -> Result<Vec<u8>, LogError> {
103 let mut buffer: Vec<u8> = Vec::new();
104 let _bytes_read = self.socket.read_to_end(&mut buffer).await.map_err(LogError::Read)?;
105 Ok(buffer)
106 }
107
108 pub async fn buffer_drain_and_peek(
113 mut self,
114 writer: &mut SocketLogWriter,
115 peek_fn: Option<impl Fn(&[u8])>,
116 ) -> Result<(), LogError> {
117 let mut line_buffer: Vec<u8> = Vec::with_capacity(MAX_LINE_BUFFER_LENGTH);
118 let mut socket_buffer: Vec<u8> = vec![0; SOCKET_BUFFER_SIZE];
119
120 while let Some(bytes_read) = NonZeroUsize::new(
121 self.socket.read(&mut socket_buffer[..]).await.map_err(LogError::Read)?,
122 ) {
123 let bytes_read = bytes_read.get();
124
125 let newline_iter = socket_buffer[..bytes_read]
126 .iter()
127 .enumerate()
128 .filter_map(|(i, &b)| if b == b'\n' { Some(i) } else { None });
129
130 let mut prev_offset = 0;
131 for idx in newline_iter {
132 let line = &socket_buffer[prev_offset..idx + 1];
133 if !line_buffer.is_empty() {
134 writer.write(line_buffer.drain(..).as_slice()).await?;
135 }
136 if let Some(peek) = &peek_fn {
137 peek(line);
138 }
139 writer.write(line).await?;
140 prev_offset = idx + 1;
141 }
142 if prev_offset != bytes_read {
143 line_buffer.extend_from_slice(&socket_buffer[prev_offset..bytes_read]);
144 }
145
146 if line_buffer.len() > MAX_LINE_BUFFER_LENGTH {
147 let bytes = &line_buffer[..MAX_LINE_BUFFER_LENGTH];
148 if let Some(peek) = &peek_fn {
149 peek(bytes);
150 }
151 writer.write(bytes).await?;
152 line_buffer.drain(..MAX_LINE_BUFFER_LENGTH);
153 }
154 }
155
156 if !line_buffer.is_empty() {
157 let bytes = &line_buffer[..];
158 if let Some(peek) = &peek_fn {
159 peek(bytes);
160 }
161 writer.write(bytes).await?;
162 }
163
164 Ok(())
165 }
166
167 pub async fn buffer_and_drain(self, writer: &mut SocketLogWriter) -> Result<(), LogError> {
169 self.buffer_drain_and_peek(writer, None::<fn(&[u8])>).await
170 }
171
172 pub fn take_socket(self) -> fasync::Socket {
174 self.socket
175 }
176}
177
178pub struct SocketLogWriter {
180 logger: fasync::Socket,
181}
182
183impl SocketLogWriter {
184 pub fn new(logger: fasync::Socket) -> Self {
185 Self { logger }
186 }
187
188 pub async fn write_str(&mut self, s: &str) -> Result<(), LogError> {
189 self.write(s.as_bytes()).await
190 }
191
192 pub async fn write(&mut self, bytes: &[u8]) -> Result<(), LogError> {
193 self.logger.write_all(bytes).await.map_err(LogError::Write)
194 }
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200 use anyhow::{Context as _, Error, format_err};
201 use assert_matches::assert_matches;
202 use futures::{TryStreamExt as _, try_join};
203 use rand::distr::{Alphanumeric, SampleString as _};
204 use rand::rng;
205 use std::sync::mpsc;
206 use test_case::test_case;
207
208 #[fuchsia_async::run_singlethreaded(test)]
209 async fn log_writer_reader_work() {
210 let (sock1, sock2) = zx::Socket::create_stream();
211 let mut log_writer = SocketLogWriter::new(fasync::Socket::from_socket(sock1));
212
213 let reader = LoggerStream::new(sock2).unwrap();
214 let reader = LogStreamReader::new(reader);
215
216 log_writer.write_str("this is string one.").await.unwrap();
217 log_writer.write_str("this is string two.").await.unwrap();
218 drop(log_writer);
219
220 let actual = reader.get_logs().await.unwrap();
221 let actual = std::str::from_utf8(&actual).unwrap();
222 assert_eq!(actual, "this is string one.this is string two.".to_owned());
223 }
224
225 #[test_case(String::from("Hello World!") ; "consumes_simple_msg")]
226 #[test_case(get_random_string(10000) ; "consumes_large_msg")]
227 #[fasync::run_singlethreaded(test)]
228 async fn logger_stream_read_to_end(msg: String) -> Result<(), Error> {
229 let (stream, tx) = create_logger_stream()?;
230
231 let () = take_and_write_to_socket(tx, &msg)?;
232 let result = stream.read_to_end().await.context("Failed to read from socket")?;
233 let actual = std::str::from_utf8(&result).context("Failed to parse bytes")?.to_owned();
234
235 assert_eq!(actual, msg);
236 Ok(())
237 }
238
239 #[fasync::run_singlethreaded(test)]
240 async fn logger_stream_read_to_end_consumes_concat_msgs() -> Result<(), Error> {
241 let (stream, tx) = create_logger_stream()?;
242 let msgs =
243 vec!["Hello World!".to_owned(), "Hola Mundo!".to_owned(), "你好,世界!".to_owned()];
244
245 for msg in msgs.iter() {
246 let () = write_to_socket(&tx, &msg)?;
247 }
248 std::mem::drop(tx);
249 let result = stream.read_to_end().await.context("Failed to read from socket")?;
250 let actual = std::str::from_utf8(&result).context("Failed to parse bytes")?.to_owned();
251
252 assert_eq!(actual, msgs.join(""));
253 Ok(())
254 }
255
256 #[fasync::run_singlethreaded(test)]
257 async fn buffer_and_drain_reads_each_line_as_a_new_message() -> Result<(), Error> {
258 let (stream, tx) = create_logger_stream()?;
259 let (mut logger, rx) = create_datagram_logger()?;
260 let msg = "Hello World\nHola Mundo!\n你好,世界!";
261
262 let (tx_peeks, rx_peeks) = mpsc::channel();
263
264 let () = take_and_write_to_socket(tx, msg)?;
265 let (actual, ()) = try_join!(read_all_messages(rx), async move {
266 stream
267 .buffer_drain_and_peek(
268 &mut logger,
269 Some(move |line: &[u8]| tx_peeks.send(line.len()).unwrap()),
270 )
271 .await
272 .context("Failed to drain stream")
273 },)?;
274
275 let expected = vec![
276 "Hello World\n".to_string(),
277 "Hola Mundo!\n".to_string(),
278 "你好,世界!".to_string(),
279 ];
280 assert_eq!(actual, expected);
281
282 let lengths = rx_peeks.iter().collect::<Vec<_>>();
283
284 assert_eq!(lengths, expected.iter().map(|v| v.len()).collect::<Vec<_>>());
285
286 Ok(())
287 }
288
289 #[fasync::run_singlethreaded(test)]
290 async fn buffer_and_drain_does_not_buffer_past_maximum_size() -> Result<(), Error> {
291 let msg = get_random_string(MAX_LINE_BUFFER_LENGTH + 10);
292 let (stream, tx) = create_logger_stream()?;
293 let (mut logger, rx) = create_datagram_logger()?;
294
295 let (tx_peeks, rx_peeks) = mpsc::channel();
296
297 let () = take_and_write_to_socket(tx, &msg)?;
298 let (actual, ()) = try_join!(read_all_messages(rx), async move {
299 stream
300 .buffer_drain_and_peek(
301 &mut logger,
302 Some(move |line: &[u8]| {
303 tx_peeks.send(line.len()).unwrap();
304 }),
305 )
306 .await
307 .context("Failed to drain stream")
308 },)?;
309
310 let lengths = rx_peeks.iter().collect::<Vec<_>>();
311
312 assert_eq!(actual.len(), 2);
313 assert_eq!(actual[0], msg[0..MAX_LINE_BUFFER_LENGTH]);
314 assert_eq!(actual[1], msg[MAX_LINE_BUFFER_LENGTH..]);
315
316 assert_eq!(lengths, vec![MAX_LINE_BUFFER_LENGTH, 10]);
317
318 Ok(())
319 }
320
321 #[fasync::run_singlethreaded(test)]
322 async fn buffer_and_drain_dumps_full_buffer_if_no_newline_seen() -> Result<(), Error> {
323 let (stream, tx) = create_logger_stream()?;
324 let (mut logger, rx) = create_datagram_logger()?;
325
326 let ((), ()) = try_join!(
327 async move {
328 let msg = get_random_string(SOCKET_BUFFER_SIZE);
329 let () = write_to_socket(&tx, &msg[..SOCKET_BUFFER_SIZE - 1])?;
332
333 let rx = rx.into_zx_socket();
336 let mut buffer = vec![0u8; SOCKET_BUFFER_SIZE];
337 let maybe_bytes_read = rx.read(&mut buffer);
338 assert_eq!(maybe_bytes_read, Err(zx::Status::SHOULD_WAIT));
339
340 let () = write_to_socket(&tx, &msg[SOCKET_BUFFER_SIZE - 1..SOCKET_BUFFER_SIZE])?;
342
343 let maybe_bytes_read = rx.read(&mut buffer);
345 assert_eq!(maybe_bytes_read, Err(zx::Status::SHOULD_WAIT));
346
347 std::mem::drop(tx);
349
350 let mut rx = fasync::Socket::from_socket(rx);
352 let bytes_read =
353 rx.read(&mut buffer).await.context("Failed to read from socket")?;
354 let msg_written = std::str::from_utf8(&buffer).context("Failed to parse bytes")?;
355
356 assert_eq!(bytes_read, SOCKET_BUFFER_SIZE);
357 assert_eq!(msg_written, msg);
358
359 Ok(())
360 },
361 async move { stream.buffer_and_drain(&mut logger).await.context("Failed to drain stream") },
362 )?;
363
364 Ok(())
365 }
366
367 #[fasync::run_singlethreaded(test)]
368 async fn buffer_and_drain_return_error_if_stream_polls_err() -> Result<(), Error> {
369 let (_tx, rx) = zx::Socket::create_stream();
370 let rx = rx.duplicate_handle(zx::Rights::BASIC).expect("duplicate");
372 let stream = LoggerStream::new(rx).context("Failed to create LoggerStream")?;
373 let (mut logger, _rx) = create_datagram_logger()?;
374
375 let result = stream.buffer_and_drain(&mut logger).await;
376
377 assert_matches!(result, Err(LogError::Read(_)));
378 Ok(())
379 }
380
381 async fn read_all_messages(socket: fasync::Socket) -> Result<Vec<String>, Error> {
382 let mut results = Vec::new();
383 let mut stream = socket.into_datagram_stream();
384 while let Some(bytes) = stream.try_next().await.context("Failed to read socket stream")? {
385 results.push(
386 std::str::from_utf8(&bytes).context("Failed to parse bytes into utf8")?.to_owned(),
387 );
388 }
389
390 Ok(results)
391 }
392
393 fn take_and_write_to_socket(socket: zx::Socket, message: &str) -> Result<(), Error> {
394 write_to_socket(&socket, &message)
395 }
396
397 fn write_to_socket(socket: &zx::Socket, message: &str) -> Result<(), Error> {
398 let bytes_written =
399 socket.write(message.as_bytes()).context("Failed to write to socket")?;
400 match bytes_written == message.len() {
401 true => Ok(()),
402 false => Err(format_err!(
403 "Bytes written to socket doesn't match len of message. Message len = {}. Bytes written = {}",
404 message.len(),
405 bytes_written
406 )),
407 }
408 }
409
410 fn create_datagram_logger() -> Result<(SocketLogWriter, fasync::Socket), Error> {
411 let (tx, rx) = zx::Socket::create_datagram();
412 let logger = SocketLogWriter::new(fasync::Socket::from_socket(tx));
413 let rx = fasync::Socket::from_socket(rx);
414 Ok((logger, rx))
415 }
416
417 fn create_logger_stream() -> Result<(LoggerStream, zx::Socket), Error> {
418 let (tx, rx) = zx::Socket::create_stream();
419 let stream = LoggerStream::new(rx).context("Failed to create LoggerStream")?;
420 Ok((stream, tx))
421 }
422
423 fn get_random_string(size: usize) -> String {
424 Alphanumeric.sample_string(&mut rng(), size)
425 }
426}