http_sse/
source.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::borrow::Cow;
6use std::mem::replace;
7
8use crate::linealyzer::Linealyzer;
9use crate::Event;
10
11/// `EventSource` `parse`s byte slices from an http sse connection into sse `Events`.
12/// It maintains state across `parse` calls, so that an `Event` whose bytes
13/// are split across two calls to `parse` will be returned by the second call.
14#[derive(Clone, Debug, PartialEq, Eq)]
15pub struct EventSource {
16    linealyzer: Linealyzer,
17    processed_first_line: bool,
18    event_type: String,
19    data: String,
20    // http sse spec says to append a newline to the data buffer on each receipt
21    // of a data field, and then to remove the last newline from the data buffer
22    // before reconstituting the event. Appending the newline forces an extra
23    // allocation/str copy on the simple path of events with single-line data,
24    // so we keep track of how many newlines to append instead.
25    data_trailing_newlines: usize,
26}
27
28impl EventSource {
29    pub fn new() -> Self {
30        Self {
31            linealyzer: Linealyzer::new(),
32            processed_first_line: false,
33            event_type: String::new(),
34            data: String::new(),
35            data_trailing_newlines: 0,
36        }
37    }
38
39    /// Ingest more bytes from an http sse stream and return all completed `Event`s.
40    pub fn parse(&mut self, bytes: &[u8]) -> Vec<Event> {
41        let mut ret = vec![];
42        for mut line in self.linealyzer.feed(bytes) {
43            // http sse stream is allowed to begin with an optional utf8-encoded
44            // byte order mark (bom), that should be ignored.
45            if !self.processed_first_line {
46                self.processed_first_line = true;
47                if line.starts_with(b"\xef\xbb\xbf") {
48                    line = match line {
49                        Cow::Borrowed(b) => Cow::Borrowed(&b[3..]),
50                        Cow::Owned(mut b) => {
51                            b.drain(..3);
52                            Cow::Owned(b)
53                        }
54                    };
55                }
56            }
57
58            if line.is_empty() {
59                if self.data.is_empty() && self.data_trailing_newlines == 0 {
60                    self.event_type.clear();
61                } else {
62                    let n = self.data_trailing_newlines - 1;
63                    self.data.reserve(n);
64                    for _ in 0..n {
65                        self.data.push('\n')
66                    }
67                    self.data_trailing_newlines = 0;
68                    ret.push(
69                        // self.event_type cannot have carriage returns or newlines
70                        // self.data cannot be empty or have carriage returns in it
71                        // so event creation should never fail
72                        Event::from_type_and_data(
73                            replace(&mut self.event_type, String::new()),
74                            replace(&mut self.data, String::new()),
75                        )
76                        .unwrap(),
77                    );
78                }
79            } else if line[0] == b':' { // ignore these lines
80            } else {
81                let (name, value) = match line.iter().position(|b| *b == b':') {
82                    Some(p) => {
83                        let (name, mut value) = line.split_at(p);
84                        value = &value[1..];
85                        if !value.is_empty() && value[0] == b' ' {
86                            value = &value[1..];
87                        }
88                        (
89                            String::from_utf8_lossy(name).into_owned(),
90                            String::from_utf8_lossy(value).into_owned(),
91                        )
92                    }
93                    None => (String::from_utf8_lossy(&line).into_owned(), String::new()),
94                };
95
96                if name == "event" {
97                    self.event_type = value;
98                } else if name == "data" {
99                    if !value.is_empty() {
100                        if self.data_trailing_newlines > 0 {
101                            self.data.reserve(self.data_trailing_newlines + value.len());
102                            for _ in 0..self.data_trailing_newlines {
103                                self.data.push('\n');
104                            }
105                            self.data_trailing_newlines = 0;
106                        }
107                        if self.data.is_empty() {
108                            self.data = value;
109                        } else {
110                            self.data.push_str(&value);
111                        }
112                    }
113                    self.data_trailing_newlines += 1;
114                } // ignoring unrecognized field names, including "id" and "retry"
115            }
116        }
117        ret
118    }
119}
120
121#[cfg(test)]
122mod tests {
123    use super::*;
124    use proptest::prelude::*;
125
126    #[test]
127    fn event_and_data() {
128        let bs = b"event: type\n\
129                   data: data\n\n";
130        let mut event_source = EventSource::new();
131
132        let events = event_source.parse(bs);
133
134        assert_eq!(events, vec![Event::from_type_and_data("type", "data").unwrap()]);
135    }
136
137    #[test]
138    fn data_before_event() {
139        let bs = b"data: data\n\
140                   event: type\n\n";
141        let mut event_source = EventSource::new();
142
143        let events = event_source.parse(bs);
144
145        assert_eq!(events, vec![Event::from_type_and_data("type", "data").unwrap()]);
146    }
147
148    #[test]
149    fn bom_stripped() {
150        let bs = b"\xef\xbb\xbfevent: type\n\
151                  data: data\n\n";
152        let mut event_source = EventSource::new();
153
154        let events = event_source.parse(bs);
155
156        assert_eq!(events, vec![Event::from_type_and_data("type", "data").unwrap()]);
157    }
158
159    #[test]
160    fn partial_bom_not_stripped() {
161        let bs = b"\xef\xbbevent: type\n\
162                   data: data\n\n";
163        let mut event_source = EventSource::new();
164
165        let events = event_source.parse(bs);
166
167        assert_eq!(events, vec![Event::from_type_and_data("", "data").unwrap()]);
168    }
169
170    #[test]
171    fn bom_stripped_only_on_first_line() {
172        let bs = b"\xef\xbb\xbfevent: type\n\
173                   \xef\xbb\xbfdata: data\n\n";
174        let mut event_source = EventSource::new();
175
176        let events = event_source.parse(bs);
177
178        assert_eq!(events, vec![]);
179    }
180
181    #[test]
182    fn invalid_utf8_replaced() {
183        let bs = b"event: type\xff\n\
184                   data: data\xff\n\n";
185        let mut event_source = EventSource::new();
186
187        let events = event_source.parse(bs);
188
189        assert_eq!(events, vec![Event::from_type_and_data("type�", "data�").unwrap()]);
190    }
191
192    #[test]
193    fn colon_allowed_in_field_value() {
194        let bs = b"event: type\n\
195                   data: :da:ta\n\n";
196        let mut event_source = EventSource::new();
197
198        let events = event_source.parse(bs);
199
200        assert_eq!(events, vec![Event::from_type_and_data("type", ":da:ta").unwrap()]);
201    }
202
203    #[test]
204    fn event_with_no_data_is_dropped() {
205        let bs = b"event: type\n\n";
206        let mut event_source = EventSource::new();
207
208        let events = event_source.parse(bs);
209        assert_eq!(events, vec![]);
210
211        let events = event_source.parse(b"data: data\n\n");
212        assert_eq!(events, vec![Event::from_type_and_data("", "data").unwrap()]);
213    }
214
215    #[test]
216    fn consecutive_event_replaces() {
217        let bs = b"event: type\n\
218                   data: data\n\
219                   event: replaced_type\n\n";
220        let mut event_source = EventSource::new();
221
222        let events = event_source.parse(bs);
223        assert_eq!(events, vec![Event::from_type_and_data("replaced_type", "data").unwrap()]);
224    }
225
226    #[test]
227    fn consecutive_data_concatenates_with_newline() {
228        let bs = b"data: data1\n\
229                   data: data2\n\n";
230        let mut event_source = EventSource::new();
231
232        let events = event_source.parse(bs);
233        assert_eq!(events, vec![Event::from_type_and_data("", "data1\ndata2").unwrap()]);
234    }
235
236    #[test]
237    fn consecutive_empty_lines_does_nothing() {
238        let bs = b"data: data\n\n\n\n\n\n\n\n";
239        let mut event_source = EventSource::new();
240
241        let events = event_source.parse(bs);
242        assert_eq!(events, vec![Event::from_type_and_data("", "data").unwrap()]);
243    }
244
245    #[test]
246    fn missing_field_value_is_empty_string() {
247        let bs = b"event: type\n\
248                   data: data\n\
249                   event\n\
250                   data\n\n";
251        let mut event_source = EventSource::new();
252
253        let events = event_source.parse(bs);
254        assert_eq!(events, vec![Event::from_type_and_data("", "data\n").unwrap()]);
255    }
256
257    #[test]
258    fn field_value_without_leading_space_not_stripped() {
259        let bs = b"event: type\n\
260                   data:data\n\n";
261        let mut event_source = EventSource::new();
262
263        let events = event_source.parse(bs);
264        assert_eq!(events, vec![Event::from_type_and_data("type", "data").unwrap()]);
265    }
266
267    #[test]
268    fn only_first_field_value_space_stripped() {
269        let bs = b"event: type\n\
270                   data:  data\n\n";
271        let mut event_source = EventSource::new();
272
273        let events = event_source.parse(bs);
274        assert_eq!(events, vec![Event::from_type_and_data("type", " data").unwrap()]);
275    }
276
277    #[test]
278    fn comment_lines_ignored() {
279        let bs = b"event: type\n\
280                   :event: other_type\n\
281                   data: data\n\n";
282        let mut event_source = EventSource::new();
283
284        let events = event_source.parse(bs);
285        assert_eq!(events, vec![Event::from_type_and_data("type", "data").unwrap()]);
286    }
287
288    #[test]
289    fn unknown_field_names_ignored() {
290        let bs = b"event: type\n\
291                   event2: other_type\n\
292                   data: data\n\n";
293        let mut event_source = EventSource::new();
294
295        let events = event_source.parse(bs);
296        assert_eq!(events, vec![Event::from_type_and_data("type", "data").unwrap()]);
297    }
298
299    #[test]
300    fn event_and_data_persisted_across_parse() {
301        let mut event_source = EventSource::new();
302
303        assert_eq!(event_source.parse(b"event: type\n"), vec![]);
304        assert_eq!(event_source.parse(b"data: data\n"), vec![]);
305        assert_eq!(
306            event_source.parse(b"\n"),
307            vec![Event::from_type_and_data("type", "data").unwrap()]
308        );
309    }
310
311    #[test]
312    fn event_and_data_not_persisted_across_parse_after_dispatch() {
313        let bs = b"event: type\n\
314                   data: data\n\n";
315        let mut event_source = EventSource::new();
316
317        let events = event_source.parse(bs);
318        assert_eq!(events, vec![Event::from_type_and_data("type", "data").unwrap()]);
319
320        assert_eq!(
321            event_source.parse(b"data: data2\n\n"),
322            vec![Event::from_type_and_data("", "data2").unwrap()]
323        );
324    }
325
326    fn assert_all_3_byte_partitionings(bytes: &[u8], events: Vec<Event>) {
327        for i in 0..bytes.len() {
328            for j in i..bytes.len() {
329                let mut event_source = EventSource::new();
330                let mut parsed_events = vec![];
331                parsed_events.append(&mut event_source.parse(&bytes[..i]));
332                parsed_events.append(&mut event_source.parse(&bytes[i..j]));
333                parsed_events.append(&mut event_source.parse(&bytes[j..]));
334                assert_eq!(parsed_events, events, "i: {}, j: {}, bytes: {:?}", i, j, bytes);
335            }
336        }
337    }
338
339    #[test]
340    fn parse_event_all_3_byte_partitionings() {
341        let bs = b"event: type\n\
342                   data: data\n\n";
343        assert_all_3_byte_partitionings(
344            bs,
345            vec![Event::from_type_and_data("type", "data").unwrap()],
346        );
347    }
348    #[test]
349    fn parse_two_events_all_3_byte_partitionings() {
350        let bs = b"event: type\n\
351                   data: data\n\
352                   \n\
353                   data: data2\n\
354                   event: type2\n\n";
355        assert_all_3_byte_partitionings(
356            bs,
357            vec![
358                Event::from_type_and_data("type", "data").unwrap(),
359                Event::from_type_and_data("type2", "data2").unwrap(),
360            ],
361        );
362    }
363
364    prop_compose! {
365        fn random_event()
366            (event_type in "[^\r\n]{0,20}",
367             data in "[^\r]{1,20}") -> Event
368        {
369            Event::from_type_and_data(event_type, data).unwrap()
370        }
371    }
372
373    prop_compose! {
374        fn random_adversarial_event()
375            (event_type in "[a:]{0,3}",
376             data in "[a\n]{1,5}") -> Event
377        {
378            Event::from_type_and_data(event_type, data).unwrap()
379        }
380    }
381
382    proptest! {
383        #![proptest_config(ProptestConfig{
384            // Disable persistence to avoid the warning for not running in the
385            // source code directory (since we're running on a Fuchsia target)
386            failure_persistence: None,
387            .. ProptestConfig::default()
388        })]
389
390        #[test]
391        fn random_event_serialize_deserialize(
392            event in random_event())
393        {
394            let mut bytes = vec![];
395            event.to_writer(&mut bytes).unwrap();
396            assert_eq!(EventSource::new().parse(&bytes), vec![event.clone()]);
397        }
398
399        #[test]
400        fn random_events_serialize_deserialize(
401            events in prop::collection::vec(random_event(), 0..10))
402        {
403            let mut bytes = vec![];
404            for event in events.iter() {
405                event.to_writer(&mut bytes).unwrap();
406            }
407            assert_eq!(EventSource::new().parse(&bytes), events);
408        }
409
410        #[test]
411        fn random_adversarial_events_serialize_deserialize(
412            events in prop::collection::vec(random_adversarial_event(), 0..10))
413        {
414            let mut bytes = vec![];
415            for event in events.iter() {
416                event.to_writer(&mut bytes).unwrap();
417            }
418            assert_eq!(EventSource::new().parse(&bytes), events);
419        }
420
421        #[test]
422        fn random_events_serialize_deserialize_all_3_line_partitionings(
423            events in prop::collection::vec(random_event(), 0..4))
424        {
425            assert_all_3_line_partitionings(events);
426        }
427
428        #[test]
429        fn random_adversarial_events_serialize_deserialize_all_3_line_partitionings(
430            events in prop::collection::vec(random_adversarial_event(), 0..4))
431        {
432            assert_all_3_line_partitionings(events);
433        }
434    }
435
436    fn assert_all_3_line_partitionings(events: Vec<Event>) {
437        let mut bytes = vec![];
438        for event in events.iter() {
439            event.to_writer(&mut bytes).unwrap();
440        }
441
442        let splits: Vec<&[u8]> = bytes.split(|b| *b == b'\n').collect();
443        for i in 0..=splits.len() {
444            for j in i..=splits.len() {
445                let mut event_source = EventSource::new();
446                let mut parsed_events = vec![];
447                parsed_events.append(&mut event_source.parse(&splits[0..i].join(&b'\n')));
448                if i > 0 {
449                    parsed_events.append(&mut event_source.parse(b"\n"));
450                }
451                parsed_events.append(&mut event_source.parse(&splits[i..j].join(&b'\n')));
452                if j > i {
453                    parsed_events.append(&mut event_source.parse(b"\n"));
454                }
455                parsed_events.append(&mut event_source.parse(&splits[j..].join(&b'\n')));
456                if j < splits.len() {
457                    parsed_events.append(&mut event_source.parse(b"\n"));
458                }
459                assert_eq!(parsed_events, events, "i: {}, j: {}", i, j);
460            }
461        }
462    }
463}