1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
45use fuchsia_async::{self as fasync, ReadableHandle, ReadableState};
67use futures::Stream;
8use std::pin::Pin;
9use std::task::{ready, Context, Poll};
10use thiserror::Error;
1112const NEWLINE: u8 = b'\n';
1314/// Splits the bytes from a streaming socket into newlines suitable for forwarding to LogSink.
15/// Returned chunks may not be complete newlines if single lines are over the size limit for a log
16/// message.
17///
18/// This implementation prioritizes standing memory usage over the number of copies or allocations
19/// made. Log forwarding is not particularly throughput sensitive, but keeping around lots of large
20/// buffers takes up memory.
21pub struct NewlineChunker {
22 socket: fasync::Socket,
23 buffer: Vec<u8>,
24 is_terminated: bool,
25 max_message_size: usize,
26 trim_newlines: bool,
27}
2829impl NewlineChunker {
30/// Creates a `NewlineChunker` that does not include the trailing `\n` in each line.
31pub fn new(socket: fasync::Socket, max_message_size: usize) -> Self {
32Self { socket, buffer: vec![], is_terminated: false, max_message_size, trim_newlines: true }
33 }
3435/// Creates a `NewlineChunker` that includes the trailing `\n` in each line.
36pub fn new_with_newlines(socket: fasync::Socket, max_message_size: usize) -> Self {
37Self {
38 socket,
39 buffer: vec![],
40 is_terminated: false,
41 max_message_size,
42 trim_newlines: false,
43 }
44 }
4546/// Removes and returns the next line or maximum-size chunk from the head of the buffer if
47 /// available.
48fn next_chunk_from_buffer(&mut self) -> Option<Vec<u8>> {
49let new_tail_start =
50if let Some(mut newline_pos) = self.buffer.iter().position(|&b| b == NEWLINE) {
51// start the tail 1 past the last newline encountered
52while let Some(&NEWLINE) = self.buffer.get(newline_pos + 1) {
53 newline_pos += 1;
54 }
55 newline_pos + 1
56} else if self.buffer.len() >= self.max_message_size {
57// we have to check the length *after* looking for newlines in case a single socket
58 // read was larger than the max size but contained newlines in the first
59 // self.max_message_size bytes
60self.max_message_size
61 } else {
62// no newlines, and the bytes in the buffer are too few to force chunking
63return None;
64 };
6566// the tail becomes the head for the next chunk
67let new_tail = self.buffer.split_off(new_tail_start);
68let mut next_chunk = std::mem::replace(&mut self.buffer, new_tail);
6970if self.trim_newlines {
71// remove the newlines from the end of the chunk we're returning
72while let Some(&NEWLINE) = next_chunk.last() {
73 next_chunk.pop();
74 }
75 }
7677Some(next_chunk)
78 }
7980fn end_of_stream(&mut self) -> Poll<Option<Vec<u8>>> {
81if !self.buffer.is_empty() {
82// the buffer is under the forced chunk size because the first return didn't happen
83Poll::Ready(Some(std::mem::replace(&mut self.buffer, vec![])))
84 } else {
85// end the stream
86self.is_terminated = true;
87 Poll::Ready(None)
88 }
89 }
90}
9192impl Stream for NewlineChunker {
93type Item = Result<Vec<u8>, NewlineChunkerError>;
9495fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
96let this = self.get_mut();
9798if this.is_terminated {
99return Poll::Ready(None);
100 }
101102// first check to see if previous socket reads have left us with lines in the buffer
103if let Some(chunk) = this.next_chunk_from_buffer() {
104return Poll::Ready(Some(Ok(chunk)));
105 }
106107loop {
108// we don't have a chunk to return, poll for reading the socket
109let readable_state = futures::ready!(this.socket.poll_readable(cx))
110 .map_err(NewlineChunkerError::PollReadable)?;
111112// find out how much buffer we should make available
113let bytes_in_socket = this
114 .socket
115 .as_ref()
116 .outstanding_read_bytes()
117 .map_err(NewlineChunkerError::OutstandingReadBytes)?;
118if bytes_in_socket == 0 {
119if readable_state == ReadableState::MaybeReadableAndClosed {
120return this.end_of_stream().map(|buf| buf.map(Ok));
121 }
122// if there are no bytes available this socket should not be considered readable
123ready!(this.socket.need_readable(cx).map_err(NewlineChunkerError::NeedReadable)?);
124continue;
125 }
126127// don't make the buffer bigger than necessary to get a chunk out
128let bytes_to_read = std::cmp::min(bytes_in_socket, this.max_message_size);
129let prev_len = this.buffer.len();
130131// grow the size of the buffer to make space for the pending read, if it fails we'll
132 // need to shrink it back down before any subsequent calls to poll_next
133this.buffer.resize(prev_len + bytes_to_read, 0);
134135let bytes_read = match this.socket.as_ref().read(&mut this.buffer[prev_len..]) {
136Ok(b) => b,
137Err(zx::Status::PEER_CLOSED) => return this.end_of_stream().map(|buf| buf.map(Ok)),
138Err(zx::Status::SHOULD_WAIT) => {
139// reset the size of the buffer to exclude the 0's we wrote above
140this.buffer.truncate(prev_len);
141return Poll::Ready(Some(Err(NewlineChunkerError::ShouldWait)));
142 }
143Err(status) => {
144// reset the size of the buffer to exclude the 0's we wrote above
145this.buffer.truncate(prev_len);
146return Poll::Ready(Some(Err(NewlineChunkerError::ReadSocket(status))));
147 }
148 };
149150// handle possible short reads
151this.buffer.truncate(prev_len + bytes_read);
152153// we got something out of the socket
154if let Some(chunk) = this.next_chunk_from_buffer() {
155// and its enough for a chunk
156return Poll::Ready(Some(Ok(chunk)));
157 } else {
158// it is not enough for a chunk, request notification when there's more
159ready!(this.socket.need_readable(cx).map_err(NewlineChunkerError::NeedReadable)?);
160 }
161 }
162 }
163}
164165#[derive(Debug, Error)]
166pub enum NewlineChunkerError {
167#[error("got SHOULD_WAIT from socket read after confirming outstanding_read_bytes > 0")]
168ShouldWait,
169170#[error("failed to read from socket")]
171ReadSocket(#[source] zx::Status),
172173#[error("failed to get readable state for socket")]
174PollReadable(#[source] zx::Status),
175176#[error("failed to register readable signal for socket")]
177NeedReadable(#[source] zx::Status),
178179#[error("failed to get number of outstanding readable bytes in socket")]
180OutstandingReadBytes(#[source] zx::Status),
181}
182183#[cfg(test)]
184mod tests {
185use super::*;
186use futures::StreamExt;
187188#[fuchsia::test]
189async fn parse_bytes_with_newline() {
190let (s1, s2) = zx::Socket::create_stream();
191let s1 = fasync::Socket::from_socket(s1);
192let mut chunker = NewlineChunker::new(s1, 100);
193 s2.write(b"test\n").expect("Failed to write");
194assert_eq!(chunker.next().await.unwrap().unwrap(), b"test".to_vec());
195 }
196197#[fuchsia::test]
198async fn parse_bytes_with_many_newlines() {
199let (s1, s2) = zx::Socket::create_stream();
200let s1 = fasync::Socket::from_socket(s1);
201let mut chunker = NewlineChunker::new(s1, 100);
202 s2.write(b"test1\ntest2\ntest3\n").expect("Failed to write");
203assert_eq!(chunker.next().await.unwrap().unwrap(), b"test1".to_vec());
204assert_eq!(chunker.next().await.unwrap().unwrap(), b"test2".to_vec());
205assert_eq!(chunker.next().await.unwrap().unwrap(), b"test3".to_vec());
206 std::mem::drop(s2);
207assert!(chunker.next().await.is_none());
208 }
209210#[fuchsia::test]
211async fn parse_bytes_with_newlines_included() {
212let (s1, s2) = zx::Socket::create_stream();
213let s1 = fasync::Socket::from_socket(s1);
214let mut chunker = NewlineChunker::new_with_newlines(s1, 100);
215 s2.write(b"test1\ntest2\ntest3\n").expect("Failed to write");
216assert_eq!(chunker.next().await.unwrap().unwrap(), b"test1\n".to_vec());
217assert_eq!(chunker.next().await.unwrap().unwrap(), b"test2\n".to_vec());
218assert_eq!(chunker.next().await.unwrap().unwrap(), b"test3\n".to_vec());
219 }
220221#[fuchsia::test]
222async fn max_message_size() {
223let (s1, s2) = zx::Socket::create_stream();
224let s1 = fasync::Socket::from_socket(s1);
225let mut chunker = NewlineChunker::new(s1, 2);
226 s2.write(b"test\n").expect("Failed to write");
227assert_eq!(chunker.next().await.unwrap().unwrap(), b"te".to_vec());
228assert_eq!(chunker.next().await.unwrap().unwrap(), b"st".to_vec());
229 }
230}