1use std::borrow::Cow;
6use std::mem::replace;
7
8use crate::linealyzer::Linealyzer;
9use crate::Event;
10
11#[derive(Clone, Debug, PartialEq, Eq)]
15pub struct EventSource {
16 linealyzer: Linealyzer,
17 processed_first_line: bool,
18 event_type: String,
19 data: String,
20 data_trailing_newlines: usize,
26}
27
28impl EventSource {
29 pub fn new() -> Self {
30 Self {
31 linealyzer: Linealyzer::new(),
32 processed_first_line: false,
33 event_type: String::new(),
34 data: String::new(),
35 data_trailing_newlines: 0,
36 }
37 }
38
39 pub fn parse(&mut self, bytes: &[u8]) -> Vec<Event> {
41 let mut ret = vec![];
42 for mut line in self.linealyzer.feed(bytes) {
43 if !self.processed_first_line {
46 self.processed_first_line = true;
47 if line.starts_with(b"\xef\xbb\xbf") {
48 line = match line {
49 Cow::Borrowed(b) => Cow::Borrowed(&b[3..]),
50 Cow::Owned(mut b) => {
51 b.drain(..3);
52 Cow::Owned(b)
53 }
54 };
55 }
56 }
57
58 if line.is_empty() {
59 if self.data.is_empty() && self.data_trailing_newlines == 0 {
60 self.event_type.clear();
61 } else {
62 let n = self.data_trailing_newlines - 1;
63 self.data.reserve(n);
64 for _ in 0..n {
65 self.data.push('\n')
66 }
67 self.data_trailing_newlines = 0;
68 ret.push(
69 Event::from_type_and_data(
73 replace(&mut self.event_type, String::new()),
74 replace(&mut self.data, String::new()),
75 )
76 .unwrap(),
77 );
78 }
79 } else if line[0] == b':' { } else {
81 let (name, value) = match line.iter().position(|b| *b == b':') {
82 Some(p) => {
83 let (name, mut value) = line.split_at(p);
84 value = &value[1..];
85 if !value.is_empty() && value[0] == b' ' {
86 value = &value[1..];
87 }
88 (
89 String::from_utf8_lossy(name).into_owned(),
90 String::from_utf8_lossy(value).into_owned(),
91 )
92 }
93 None => (String::from_utf8_lossy(&line).into_owned(), String::new()),
94 };
95
96 if name == "event" {
97 self.event_type = value;
98 } else if name == "data" {
99 if !value.is_empty() {
100 if self.data_trailing_newlines > 0 {
101 self.data.reserve(self.data_trailing_newlines + value.len());
102 for _ in 0..self.data_trailing_newlines {
103 self.data.push('\n');
104 }
105 self.data_trailing_newlines = 0;
106 }
107 if self.data.is_empty() {
108 self.data = value;
109 } else {
110 self.data.push_str(&value);
111 }
112 }
113 self.data_trailing_newlines += 1;
114 } }
116 }
117 ret
118 }
119}
120
121#[cfg(test)]
122mod tests {
123 use super::*;
124 use proptest::prelude::*;
125
126 #[test]
127 fn event_and_data() {
128 let bs = b"event: type\n\
129 data: data\n\n";
130 let mut event_source = EventSource::new();
131
132 let events = event_source.parse(bs);
133
134 assert_eq!(events, vec![Event::from_type_and_data("type", "data").unwrap()]);
135 }
136
137 #[test]
138 fn data_before_event() {
139 let bs = b"data: data\n\
140 event: type\n\n";
141 let mut event_source = EventSource::new();
142
143 let events = event_source.parse(bs);
144
145 assert_eq!(events, vec![Event::from_type_and_data("type", "data").unwrap()]);
146 }
147
148 #[test]
149 fn bom_stripped() {
150 let bs = b"\xef\xbb\xbfevent: type\n\
151 data: data\n\n";
152 let mut event_source = EventSource::new();
153
154 let events = event_source.parse(bs);
155
156 assert_eq!(events, vec![Event::from_type_and_data("type", "data").unwrap()]);
157 }
158
159 #[test]
160 fn partial_bom_not_stripped() {
161 let bs = b"\xef\xbbevent: type\n\
162 data: data\n\n";
163 let mut event_source = EventSource::new();
164
165 let events = event_source.parse(bs);
166
167 assert_eq!(events, vec![Event::from_type_and_data("", "data").unwrap()]);
168 }
169
170 #[test]
171 fn bom_stripped_only_on_first_line() {
172 let bs = b"\xef\xbb\xbfevent: type\n\
173 \xef\xbb\xbfdata: data\n\n";
174 let mut event_source = EventSource::new();
175
176 let events = event_source.parse(bs);
177
178 assert_eq!(events, vec![]);
179 }
180
181 #[test]
182 fn invalid_utf8_replaced() {
183 let bs = b"event: type\xff\n\
184 data: data\xff\n\n";
185 let mut event_source = EventSource::new();
186
187 let events = event_source.parse(bs);
188
189 assert_eq!(events, vec![Event::from_type_and_data("type�", "data�").unwrap()]);
190 }
191
192 #[test]
193 fn colon_allowed_in_field_value() {
194 let bs = b"event: type\n\
195 data: :da:ta\n\n";
196 let mut event_source = EventSource::new();
197
198 let events = event_source.parse(bs);
199
200 assert_eq!(events, vec![Event::from_type_and_data("type", ":da:ta").unwrap()]);
201 }
202
203 #[test]
204 fn event_with_no_data_is_dropped() {
205 let bs = b"event: type\n\n";
206 let mut event_source = EventSource::new();
207
208 let events = event_source.parse(bs);
209 assert_eq!(events, vec![]);
210
211 let events = event_source.parse(b"data: data\n\n");
212 assert_eq!(events, vec![Event::from_type_and_data("", "data").unwrap()]);
213 }
214
215 #[test]
216 fn consecutive_event_replaces() {
217 let bs = b"event: type\n\
218 data: data\n\
219 event: replaced_type\n\n";
220 let mut event_source = EventSource::new();
221
222 let events = event_source.parse(bs);
223 assert_eq!(events, vec![Event::from_type_and_data("replaced_type", "data").unwrap()]);
224 }
225
226 #[test]
227 fn consecutive_data_concatenates_with_newline() {
228 let bs = b"data: data1\n\
229 data: data2\n\n";
230 let mut event_source = EventSource::new();
231
232 let events = event_source.parse(bs);
233 assert_eq!(events, vec![Event::from_type_and_data("", "data1\ndata2").unwrap()]);
234 }
235
236 #[test]
237 fn consecutive_empty_lines_does_nothing() {
238 let bs = b"data: data\n\n\n\n\n\n\n\n";
239 let mut event_source = EventSource::new();
240
241 let events = event_source.parse(bs);
242 assert_eq!(events, vec![Event::from_type_and_data("", "data").unwrap()]);
243 }
244
245 #[test]
246 fn missing_field_value_is_empty_string() {
247 let bs = b"event: type\n\
248 data: data\n\
249 event\n\
250 data\n\n";
251 let mut event_source = EventSource::new();
252
253 let events = event_source.parse(bs);
254 assert_eq!(events, vec![Event::from_type_and_data("", "data\n").unwrap()]);
255 }
256
257 #[test]
258 fn field_value_without_leading_space_not_stripped() {
259 let bs = b"event: type\n\
260 data:data\n\n";
261 let mut event_source = EventSource::new();
262
263 let events = event_source.parse(bs);
264 assert_eq!(events, vec![Event::from_type_and_data("type", "data").unwrap()]);
265 }
266
267 #[test]
268 fn only_first_field_value_space_stripped() {
269 let bs = b"event: type\n\
270 data: data\n\n";
271 let mut event_source = EventSource::new();
272
273 let events = event_source.parse(bs);
274 assert_eq!(events, vec![Event::from_type_and_data("type", " data").unwrap()]);
275 }
276
277 #[test]
278 fn comment_lines_ignored() {
279 let bs = b"event: type\n\
280 :event: other_type\n\
281 data: data\n\n";
282 let mut event_source = EventSource::new();
283
284 let events = event_source.parse(bs);
285 assert_eq!(events, vec![Event::from_type_and_data("type", "data").unwrap()]);
286 }
287
288 #[test]
289 fn unknown_field_names_ignored() {
290 let bs = b"event: type\n\
291 event2: other_type\n\
292 data: data\n\n";
293 let mut event_source = EventSource::new();
294
295 let events = event_source.parse(bs);
296 assert_eq!(events, vec![Event::from_type_and_data("type", "data").unwrap()]);
297 }
298
299 #[test]
300 fn event_and_data_persisted_across_parse() {
301 let mut event_source = EventSource::new();
302
303 assert_eq!(event_source.parse(b"event: type\n"), vec![]);
304 assert_eq!(event_source.parse(b"data: data\n"), vec![]);
305 assert_eq!(
306 event_source.parse(b"\n"),
307 vec![Event::from_type_and_data("type", "data").unwrap()]
308 );
309 }
310
311 #[test]
312 fn event_and_data_not_persisted_across_parse_after_dispatch() {
313 let bs = b"event: type\n\
314 data: data\n\n";
315 let mut event_source = EventSource::new();
316
317 let events = event_source.parse(bs);
318 assert_eq!(events, vec![Event::from_type_and_data("type", "data").unwrap()]);
319
320 assert_eq!(
321 event_source.parse(b"data: data2\n\n"),
322 vec![Event::from_type_and_data("", "data2").unwrap()]
323 );
324 }
325
326 fn assert_all_3_byte_partitionings(bytes: &[u8], events: Vec<Event>) {
327 for i in 0..bytes.len() {
328 for j in i..bytes.len() {
329 let mut event_source = EventSource::new();
330 let mut parsed_events = vec![];
331 parsed_events.append(&mut event_source.parse(&bytes[..i]));
332 parsed_events.append(&mut event_source.parse(&bytes[i..j]));
333 parsed_events.append(&mut event_source.parse(&bytes[j..]));
334 assert_eq!(parsed_events, events, "i: {}, j: {}, bytes: {:?}", i, j, bytes);
335 }
336 }
337 }
338
339 #[test]
340 fn parse_event_all_3_byte_partitionings() {
341 let bs = b"event: type\n\
342 data: data\n\n";
343 assert_all_3_byte_partitionings(
344 bs,
345 vec![Event::from_type_and_data("type", "data").unwrap()],
346 );
347 }
348 #[test]
349 fn parse_two_events_all_3_byte_partitionings() {
350 let bs = b"event: type\n\
351 data: data\n\
352 \n\
353 data: data2\n\
354 event: type2\n\n";
355 assert_all_3_byte_partitionings(
356 bs,
357 vec![
358 Event::from_type_and_data("type", "data").unwrap(),
359 Event::from_type_and_data("type2", "data2").unwrap(),
360 ],
361 );
362 }
363
364 prop_compose! {
365 fn random_event()
366 (event_type in "[^\r\n]{0,20}",
367 data in "[^\r]{1,20}") -> Event
368 {
369 Event::from_type_and_data(event_type, data).unwrap()
370 }
371 }
372
373 prop_compose! {
374 fn random_adversarial_event()
375 (event_type in "[a:]{0,3}",
376 data in "[a\n]{1,5}") -> Event
377 {
378 Event::from_type_and_data(event_type, data).unwrap()
379 }
380 }
381
382 proptest! {
383 #![proptest_config(ProptestConfig{
384 failure_persistence: None,
387 .. ProptestConfig::default()
388 })]
389
390 #[test]
391 fn random_event_serialize_deserialize(
392 event in random_event())
393 {
394 let mut bytes = vec![];
395 event.to_writer(&mut bytes).unwrap();
396 assert_eq!(EventSource::new().parse(&bytes), vec![event.clone()]);
397 }
398
399 #[test]
400 fn random_events_serialize_deserialize(
401 events in prop::collection::vec(random_event(), 0..10))
402 {
403 let mut bytes = vec![];
404 for event in events.iter() {
405 event.to_writer(&mut bytes).unwrap();
406 }
407 assert_eq!(EventSource::new().parse(&bytes), events);
408 }
409
410 #[test]
411 fn random_adversarial_events_serialize_deserialize(
412 events in prop::collection::vec(random_adversarial_event(), 0..10))
413 {
414 let mut bytes = vec![];
415 for event in events.iter() {
416 event.to_writer(&mut bytes).unwrap();
417 }
418 assert_eq!(EventSource::new().parse(&bytes), events);
419 }
420
421 #[test]
422 fn random_events_serialize_deserialize_all_3_line_partitionings(
423 events in prop::collection::vec(random_event(), 0..4))
424 {
425 assert_all_3_line_partitionings(events);
426 }
427
428 #[test]
429 fn random_adversarial_events_serialize_deserialize_all_3_line_partitionings(
430 events in prop::collection::vec(random_adversarial_event(), 0..4))
431 {
432 assert_all_3_line_partitionings(events);
433 }
434 }
435
436 fn assert_all_3_line_partitionings(events: Vec<Event>) {
437 let mut bytes = vec![];
438 for event in events.iter() {
439 event.to_writer(&mut bytes).unwrap();
440 }
441
442 let splits: Vec<&[u8]> = bytes.split(|b| *b == b'\n').collect();
443 for i in 0..=splits.len() {
444 for j in i..=splits.len() {
445 let mut event_source = EventSource::new();
446 let mut parsed_events = vec![];
447 parsed_events.append(&mut event_source.parse(&splits[0..i].join(&b'\n')));
448 if i > 0 {
449 parsed_events.append(&mut event_source.parse(b"\n"));
450 }
451 parsed_events.append(&mut event_source.parse(&splits[i..j].join(&b'\n')));
452 if j > i {
453 parsed_events.append(&mut event_source.parse(b"\n"));
454 }
455 parsed_events.append(&mut event_source.parse(&splits[j..].join(&b'\n')));
456 if j < splits.len() {
457 parsed_events.append(&mut event_source.parse(b"\n"));
458 }
459 assert_eq!(parsed_events, events, "i: {}, j: {}", i, j);
460 }
461 }
462 }
463}