Skip to main content

starnix_core/task/
syslog.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::security;
6use crate::task::dynamic_thread_spawner::SpawnRequestBuilder;
7use crate::task::{CurrentTask, EventHandler, WaitCallback, WaitCanceler, WaitQueue, Waiter};
8use crate::vfs::OutputBuffer;
9use diagnostics_data::{Data, Logs, LogsData, Severity};
10use estimate_timeline::{DefaultFetcher, TimeFetcher, TimelineEstimator};
11use fidl_fuchsia_diagnostics as fdiagnostics;
12use fuchsia_component::client::connect_to_protocol_sync;
13use fuchsia_inspect::Inspector;
14use futures::FutureExt;
15use serde::Deserialize;
16use starnix_sync::{Locked, Mutex, Unlocked};
17use starnix_uapi::auth::CAP_SYSLOG;
18use starnix_uapi::errors::{EAGAIN, Errno, errno, error};
19use starnix_uapi::syslog::SyslogAction;
20use starnix_uapi::vfs::FdEvents;
21use std::cmp;
22use std::collections::VecDeque;
23use std::io::{self, Write};
24use std::sync::atomic::Ordering;
25use std::sync::{Arc, OnceLock, mpsc};
26use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, TryFromBytes};
27
28const BUFFER_SIZE: i32 = 1_049_000;
29
30const NANOS_PER_SECOND: i64 = 1_000_000_000;
31const MICROS_PER_NANOSECOND: i64 = 1_000;
32
33#[derive(Default)]
34pub struct Syslog {
35    syscall_subscription: OnceLock<Mutex<LogSubscription>>,
36    state: Arc<Mutex<TimelineEstimator<DefaultFetcher>>>,
37}
38
39#[derive(Debug)]
40pub enum SyslogAccess {
41    DevKmsgRead,
42    ProcKmsg(SyslogAction),
43    Syscall(SyslogAction),
44}
45
46impl Syslog {
47    pub fn init(&self, system_task: &CurrentTask) -> Result<(), anyhow::Error> {
48        let state = self.state.clone();
49        system_task.kernel.inspect_node.record_lazy_child("syslog", move || {
50            let state = state.clone();
51            async move {
52                let inspector = Inspector::default();
53                let state_guard = state.lock();
54                inspector.root().record_uint("max_timeline_size", state_guard.max_timeline_size());
55                inspector
56                    .root()
57                    .record_uint("timeline_overflows", state_guard.timeline_overflows());
58                Ok(inspector)
59            }
60            .boxed()
61        });
62
63        let subscription = LogSubscription::snapshot_then_subscribe(system_task)?;
64        self.syscall_subscription.set(Mutex::new(subscription)).expect("syslog inititialized once");
65        Ok(())
66    }
67
68    pub fn access(
69        &self,
70        current_task: &CurrentTask,
71        access: SyslogAccess,
72    ) -> Result<GrantedSyslog<'_>, Errno> {
73        Self::validate_access(current_task, access)?;
74        let syscall_subscription = self.subscription()?;
75        Ok(GrantedSyslog { syscall_subscription })
76    }
77
78    /// Validates that syslog access is unrestricted, or that the `current_task` has the relevant
79    /// capability, and applies the SELinux policy.
80    pub fn validate_access(current_task: &CurrentTask, access: SyslogAccess) -> Result<(), Errno> {
81        let (action, check_capabilities) = match access {
82            SyslogAccess::ProcKmsg(SyslogAction::Open) => (SyslogAction::Open, true),
83            SyslogAccess::DevKmsgRead => (SyslogAction::ReadAll, true),
84            SyslogAccess::Syscall(a) => (a, true),
85            // If we got here we already validated Open on /proc/kmsg.
86            SyslogAccess::ProcKmsg(a) => (a, false),
87        };
88
89        // According to syslog(2) man, ReadAll (3) and SizeBuffer (10) are allowed unprivileged
90        // access only if restrict_dmsg is 0.
91        let action_is_privileged = !matches!(
92            access,
93            SyslogAccess::Syscall(SyslogAction::ReadAll | SyslogAction::SizeBuffer)
94                | SyslogAccess::DevKmsgRead,
95        );
96        let restrict_dmesg = current_task.kernel().restrict_dmesg.load(Ordering::Relaxed);
97        if check_capabilities && (action_is_privileged || restrict_dmesg) {
98            security::check_task_capable(current_task, CAP_SYSLOG)?;
99        }
100
101        security::check_syslog_access(current_task, action)?;
102        Ok(())
103    }
104
105    pub fn snapshot_then_subscribe(current_task: &CurrentTask) -> Result<LogSubscription, Errno> {
106        LogSubscription::snapshot_then_subscribe(current_task)
107    }
108
109    pub fn subscribe(current_task: &CurrentTask) -> Result<LogSubscription, Errno> {
110        LogSubscription::subscribe(current_task)
111    }
112
113    fn subscription(&self) -> Result<&Mutex<LogSubscription>, Errno> {
114        self.syscall_subscription.get().ok_or_else(|| errno!(ENOENT))
115    }
116}
117
118pub struct GrantedSyslog<'a> {
119    syscall_subscription: &'a Mutex<LogSubscription>,
120}
121
122impl GrantedSyslog<'_> {
123    pub fn read(&self, out: &mut dyn OutputBuffer) -> Result<i32, Errno> {
124        let mut subscription = self.syscall_subscription.lock();
125        if let Some(log) = subscription.try_next()? {
126            let size_to_write = cmp::min(log.len(), out.available() as usize);
127            out.write(&log[..size_to_write])?;
128            return Ok(size_to_write as i32);
129        }
130        Ok(0)
131    }
132
133    pub fn wait(&self, waiter: &Waiter, events: FdEvents, handler: EventHandler) -> WaitCanceler {
134        self.syscall_subscription.lock().wait(waiter, events, handler)
135    }
136
137    pub fn blocking_read(
138        &self,
139        locked: &mut Locked<Unlocked>,
140        current_task: &CurrentTask,
141        out: &mut dyn OutputBuffer,
142    ) -> Result<i32, Errno> {
143        let mut subscription = self.syscall_subscription.lock();
144        let mut write_log = |log: Vec<u8>| {
145            let size_to_write = cmp::min(log.len(), out.available() as usize);
146            out.write(&log[..size_to_write])?;
147            Ok(size_to_write as i32)
148        };
149        match subscription.try_next() {
150            Err(errno) if errno == EAGAIN => {}
151            Err(errno) => return Err(errno),
152            Ok(Some(log)) => return write_log(log),
153            Ok(None) => return Ok(0),
154        }
155        let waiter = Waiter::new();
156        loop {
157            let _w = subscription.wait(
158                &waiter,
159                FdEvents::POLLIN | FdEvents::POLLHUP,
160                WaitCallback::none(),
161            );
162            match subscription.try_next() {
163                Err(errno) if errno == EAGAIN => {}
164                Err(errno) => return Err(errno),
165                Ok(Some(log)) => return write_log(log),
166                Ok(None) => return Ok(0),
167            }
168            waiter.wait(locked, current_task)?;
169        }
170    }
171
172    pub fn read_all(
173        &self,
174        current_task: &CurrentTask,
175        out: &mut dyn OutputBuffer,
176    ) -> Result<i32, Errno> {
177        let mut subscription = LogSubscription::snapshot(current_task)?;
178        let mut buffer = ResultBuffer::new(out.available());
179        while let Some(log_result) = subscription.next() {
180            buffer.push(log_result?);
181        }
182        let result: Vec<u8> = buffer.into();
183        out.write(result.as_slice())?;
184        Ok(result.len() as i32)
185    }
186
187    pub fn size_unread(&self) -> Result<i32, Errno> {
188        let mut subscription = self.syscall_subscription.lock();
189        Ok(subscription.available()?.try_into().unwrap_or(std::i32::MAX))
190    }
191
192    pub fn size_buffer(&self) -> Result<i32, Errno> {
193        // For now always return a constant for this.
194        Ok(BUFFER_SIZE)
195    }
196}
197
198#[derive(Debug)]
199pub struct LogSubscription {
200    pending: Option<Vec<u8>>,
201    receiver: mpsc::Receiver<Result<Vec<u8>, Errno>>,
202    waiters: Arc<WaitQueue>,
203}
204
205#[derive(Debug, Deserialize)]
206#[serde(untagged)]
207enum OneOrMany<T> {
208    Many(Vec<T>),
209    One(T),
210}
211
212impl LogSubscription {
213    pub fn wait(&self, waiter: &Waiter, events: FdEvents, handler: EventHandler) -> WaitCanceler {
214        self.waiters.wait_async_fd_events(waiter, events, handler)
215    }
216
217    pub fn available(&mut self) -> Result<usize, Errno> {
218        if let Some(log) = &self.pending {
219            return Ok(log.len());
220        }
221        match self.try_next() {
222            Err(err) if err == EAGAIN => Ok(0),
223            Err(err) => Err(err),
224            Ok(Some(log)) => {
225                let size = log.len();
226                self.pending.replace(log);
227                return Ok(size);
228            }
229            Ok(None) => Ok(0),
230        }
231    }
232
233    fn snapshot(current_task: &CurrentTask) -> Result<LogIterator, Errno> {
234        LogIterator::new(&current_task.kernel.syslog, fdiagnostics::StreamMode::Snapshot)
235    }
236
237    fn subscribe(current_task: &CurrentTask) -> Result<Self, Errno> {
238        Self::new_listening(current_task, fdiagnostics::StreamMode::Subscribe)
239    }
240
241    fn snapshot_then_subscribe(current_task: &CurrentTask) -> Result<Self, Errno> {
242        Self::new_listening(current_task, fdiagnostics::StreamMode::SnapshotThenSubscribe)
243    }
244
245    fn new_listening(
246        current_task: &CurrentTask,
247        mode: fdiagnostics::StreamMode,
248    ) -> Result<Self, Errno> {
249        let iterator = LogIterator::new(&current_task.kernel.syslog, mode)?;
250        let (snd, receiver) = mpsc::sync_channel(1);
251        let waiters = Arc::new(WaitQueue::default());
252        let waiters_clone = waiters.clone();
253        let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {
254            scopeguard::defer! {
255                waiters_clone.notify_fd_events(FdEvents::POLLHUP);
256            };
257            for log in iterator {
258                if snd.send(log).is_err() {
259                    break;
260                };
261                waiters_clone.notify_fd_events(FdEvents::POLLIN);
262            }
263        };
264        let req = SpawnRequestBuilder::new()
265            .with_debug_name("syslog-listener")
266            .with_sync_closure(closure)
267            .build();
268        current_task.kernel().kthreads.spawner().spawn_from_request(req);
269
270        Ok(Self { receiver, waiters, pending: Default::default() })
271    }
272
273    fn try_next(&mut self) -> Result<Option<Vec<u8>>, Errno> {
274        if let Some(value) = self.pending.take() {
275            return Ok(Some(value));
276        }
277        match self.receiver.try_recv() {
278            // We got the next log.
279            Ok(Ok(log)) => Ok(Some(log)),
280            // An error happened attempting to get the next log.
281            Ok(Err(err)) => Err(err),
282            // The channel was closed and there's no more messages in the queue.
283            Err(mpsc::TryRecvError::Disconnected) => Ok(None),
284            // No messages available but the channel hasn't closed.
285            Err(mpsc::TryRecvError::Empty) => error!(EAGAIN),
286        }
287    }
288}
289
290struct LogIterator {
291    iterator: fdiagnostics::BatchIteratorSynchronousProxy,
292    pending_formatted_contents: VecDeque<fdiagnostics::FormattedContent>,
293    pending_datas: VecDeque<Data<Logs>>,
294    state: Arc<Mutex<TimelineEstimator<DefaultFetcher>>>,
295    tags: std::collections::HashMap<u64, diagnostics_message::MonikerWithUrl>,
296}
297
298impl LogIterator {
299    fn new(syslog: &Syslog, mode: fdiagnostics::StreamMode) -> Result<Self, Errno> {
300        let accessor = connect_to_protocol_sync::<fdiagnostics::ArchiveAccessorMarker>()
301            .map_err(|_| errno!(ENOENT, format!("Failed to connecto to ArchiveAccessor")))?;
302        let is_subscribe = matches!(mode, fdiagnostics::StreamMode::Subscribe);
303        let stream_parameters = fdiagnostics::StreamParameters {
304            stream_mode: Some(mode),
305            data_type: Some(fdiagnostics::DataType::Logs),
306            format: Some(fdiagnostics::Format::Fxt),
307            client_selector_configuration: Some(
308                fdiagnostics::ClientSelectorConfiguration::SelectAll(true),
309            ),
310            ..fdiagnostics::StreamParameters::default()
311        };
312        let (client_end, server_end) =
313            fidl::endpoints::create_endpoints::<fdiagnostics::BatchIteratorMarker>();
314        accessor.stream_diagnostics(&stream_parameters, server_end).map_err(|err| {
315            errno!(EIO, format!("ArchiveAccessor/StreamDiagnostics failed: {err}"))
316        })?;
317        let iterator = fdiagnostics::BatchIteratorSynchronousProxy::new(client_end.into_channel());
318        if is_subscribe {
319            let () = iterator.wait_for_ready(zx::MonotonicInstant::INFINITE).map_err(|err| {
320                errno!(EIO, format!("Failed to wait for BatchIterator being ready: {err}"))
321            })?;
322        }
323        Ok(Self {
324            iterator,
325            pending_formatted_contents: VecDeque::new(),
326            pending_datas: VecDeque::new(),
327            state: syslog.state.clone(),
328            tags: std::collections::HashMap::new(),
329        })
330    }
331
332    // TODO(b/315520045): Investigate if we should make this
333    // not allocate anything.
334    fn get_next(&mut self) -> Result<Option<Vec<u8>>, Errno> {
335        'main_loop: loop {
336            while let Some(data) = self.pending_datas.pop_front() {
337                if let Some(log) = format_log(data, &self.state).map_err(|_| errno!(EIO))? {
338                    return Ok(Some(log));
339                }
340            }
341            while let Some(formatted_content) = self.pending_formatted_contents.pop_front() {
342                let output: OneOrMany<Data<Logs>> = match formatted_content {
343                    fdiagnostics::FormattedContent::Fxt(data) => {
344                        let buf = data
345                            .read_to_vec(
346                                0,
347                                data.get_content_size().map_err(|a| {
348                                    errno!(EIO, format!("Error {a} getting VMO size"))
349                                })?,
350                            )
351                            .map_err(|err| {
352                                errno!(EIO, format!("failed to read logs vmo: {err}"))
353                            })?;
354                        let mut current_slice = buf.as_ref();
355                        let mut ret: Option<OneOrMany<LogsData>> = None;
356                        loop {
357                            let (record, remaining) =
358                                diagnostics_log_encoding::parse::parse_record(current_slice)
359                                    .map_err(|a| errno!(EIO, format!("Error {a} parsing FXT")))?;
360
361                            let record_len = current_slice.len() - remaining.len();
362                            let record_bytes = &current_slice[..record_len];
363
364                            let header = diagnostics_log_encoding::Header::read_from_bytes(
365                                &current_slice[..8],
366                            )
367                            .map_err(|_| errno!(EIO, "Invalid FXT header"))?;
368                            let tag = header.tag();
369                            let is_manifest =
370                                (tag & diagnostics_log_encoding::LOG_CONTROL_BIT) != 0;
371                            let actual_tag = tag & !diagnostics_log_encoding::LOG_CONTROL_BIT;
372
373                            if is_manifest {
374                                let mut moniker = None;
375                                let mut url = None;
376                                for arg in &record.arguments {
377                                    use diagnostics_log_encoding::Value;
378                                    if arg.name() == "moniker" {
379                                        if let Value::Text(t) = arg.value() {
380                                            moniker = Some(diagnostics_data::ExtendedMoniker::parse_str(&t).unwrap_or_else(|_| diagnostics_data::ExtendedMoniker::ComponentInstance(moniker::Moniker::parse_str("unknown").unwrap())));
381                                        }
382                                    } else if arg.name() == "url" {
383                                        if let Value::Text(t) = arg.value() {
384                                            url = Some(flyweights::FlyStr::new(t));
385                                        }
386                                    }
387                                }
388                                if let (Some(moniker), Some(url)) = (moniker, url) {
389                                    self.tags.insert(
390                                        actual_tag as u64,
391                                        diagnostics_message::MonikerWithUrl { moniker, url },
392                                    );
393                                }
394                            } else {
395                                let source = self
396                                    .tags
397                                    .get(&(actual_tag as u64))
398                                    .cloned()
399                                    .unwrap_or_else(|| diagnostics_message::MonikerWithUrl {
400                                        moniker:
401                                            diagnostics_data::ExtendedMoniker::ComponentInstance(
402                                                moniker::Moniker::parse_str("unknown").unwrap(),
403                                            ),
404                                        url: flyweights::FlyStr::new("unknown"),
405                                    });
406
407                                let data =
408                                    diagnostics_message::from_structured(source, record_bytes)
409                                        .map_err(|a| {
410                                            errno!(EIO, format!("Error {a} parsing FXT"))
411                                        })?;
412
413                                ret = Some(match ret.take() {
414                                    Some(OneOrMany::One(one)) => OneOrMany::Many(vec![one, data]),
415                                    Some(OneOrMany::Many(mut many)) => {
416                                        many.push(data);
417                                        OneOrMany::Many(many)
418                                    }
419                                    None => OneOrMany::One(data),
420                                });
421                            }
422
423                            if remaining.is_empty() {
424                                break;
425                            }
426                            current_slice = remaining;
427                        }
428                        ret.unwrap_or_else(|| OneOrMany::Many(vec![]))
429                    }
430                    format => {
431                        unreachable!("we only request and expect one format. Got: {format:?}")
432                    }
433                };
434                match output {
435                    OneOrMany::One(data) => {
436                        if let Some(log) = format_log(data, &self.state).map_err(|_| errno!(EIO))? {
437                            return Ok(Some(log));
438                        }
439                    }
440                    OneOrMany::Many(datas) => {
441                        if datas.len() > 0 {
442                            self.pending_datas.extend(datas);
443                            continue 'main_loop;
444                        }
445                    }
446                }
447            }
448            let next_batch = self
449                .iterator
450                .get_next(zx::MonotonicInstant::INFINITE)
451                .map_err(|_| errno!(ENOENT))?
452                .map_err(|_| errno!(ENOENT))?;
453            if next_batch.is_empty() {
454                return Ok(None);
455            }
456            self.pending_formatted_contents = VecDeque::from(next_batch);
457        }
458    }
459}
460
461impl Iterator for LogIterator {
462    type Item = Result<Vec<u8>, Errno>;
463
464    fn next(&mut self) -> Option<Result<Vec<u8>, Errno>> {
465        self.get_next().transpose()
466    }
467}
468
469impl Iterator for LogSubscription {
470    type Item = Result<Vec<u8>, Errno>;
471
472    fn next(&mut self) -> Option<Self::Item> {
473        self.try_next().transpose()
474    }
475}
476
477struct ResultBuffer {
478    max_size: usize,
479    buffer: VecDeque<Vec<u8>>,
480    current_size: usize,
481}
482
483impl ResultBuffer {
484    fn new(max_size: usize) -> Self {
485        Self { max_size, buffer: VecDeque::default(), current_size: 0 }
486    }
487
488    fn push(&mut self, data: Vec<u8>) {
489        while !self.buffer.is_empty() && self.current_size + data.len() > self.max_size {
490            let old = self.buffer.pop_front().unwrap();
491            self.current_size -= old.len();
492        }
493        self.current_size += data.len();
494        self.buffer.push_back(data);
495    }
496}
497
498impl Into<Vec<u8>> for ResultBuffer {
499    fn into(self) -> Vec<u8> {
500        let mut result = Vec::with_capacity(self.current_size);
501        for mut item in self.buffer {
502            result.append(&mut item);
503        }
504        // If we still exceed the size (for example, a single message of size N in a buffer of
505        // size M when N>M), we trim the output.
506        result.truncate(self.max_size);
507        result
508    }
509}
510
511#[derive(Debug, Eq, PartialEq, Copy, Clone, KnownLayout, TryFromBytes, Immutable, IntoBytes)]
512#[repr(u8)]
513pub enum KmsgLevel {
514    Emergency = 0,
515    Alert = 1,
516    Critical = 2,
517    Error = 3,
518    Warning = 4,
519    Notice = 5,
520    Info = 6,
521    Debug = 7,
522}
523
524impl KmsgLevel {
525    fn from_raw(value: u8) -> Option<KmsgLevel> {
526        zerocopy::try_transmute!(value).ok()
527    }
528}
529
530/// Given a string starting with <[0-9]*>, returns the level interpreted from the lower 3 bits.
531/// The next 8 is the facility, which we ignore atm.
532/// If the string doesn't start with a valid level, we return None.
533/// The slice returned is the rest of the message after the closing '>'.
534///
535/// Reference: https://www.kernel.org/doc/Documentation/ABI/testing/dev-kmsg
536pub(crate) fn extract_level(msg: &[u8]) -> Option<(KmsgLevel, &[u8])> {
537    let mut bytes_iter = msg.iter();
538    let Some(c) = bytes_iter.next() else {
539        return None;
540    };
541    if *c != b'<' {
542        return None;
543    }
544    let Some(end) = bytes_iter.enumerate().find(|(_, c)| **c == b'>').map(|(i, _)| i + 1) else {
545        return None;
546    };
547    std::str::from_utf8(&msg[1..end])
548        .ok()
549        .and_then(|s| s.parse::<u64>().ok())
550        .map(|level| (level & 0x07) as u8)
551        .and_then(KmsgLevel::from_raw)
552        .map(|level| (level, &msg[end + 1..]))
553}
554
555fn format_log<T: TimeFetcher>(
556    data: Data<Logs>,
557    state: &Arc<Mutex<TimelineEstimator<T>>>,
558) -> Result<Option<Vec<u8>>, io::Error> {
559    let mut formatted_tags = match data.tags() {
560        None => vec![],
561        Some(tags) => {
562            let mut formatted = vec![];
563            for (i, tag) in tags.iter().enumerate() {
564                // TODO(b/299533466): remove this.
565                if tag.contains("fxlogcat") {
566                    return Ok(None);
567                }
568                if i != 0 {
569                    write!(&mut formatted, ",")?;
570                }
571                write!(&mut formatted, "{tag}")?;
572            }
573            write!(&mut formatted, ": ")?;
574            formatted
575        }
576    };
577
578    let mut result = Vec::<u8>::new();
579    let (level, msg_after_level) = match data.msg().and_then(|msg| extract_level(msg.as_bytes())) {
580        Some((level, remaining_msg)) => (level as u8, Some(remaining_msg)),
581        None => match data.severity() {
582            Severity::Trace | Severity::Debug => (KmsgLevel::Debug as u8, None),
583            Severity::Info => (KmsgLevel::Info as u8, None),
584            Severity::Warn => (KmsgLevel::Warning as u8, None),
585            Severity::Error => (KmsgLevel::Error as u8, None),
586            Severity::Fatal => (KmsgLevel::Critical as u8, None),
587        },
588    };
589
590    // TODO(https://fxbug.dev/433724019): this isn't correct strictly speaking, but will be in most
591    // cases. We unapply the *current* offset and in the case where suspension happened between
592    // when the log message was generated and when Starnix is forwarding the log message, this will
593    // be different from the *actual* offset prior to suspension.
594    let time = state.lock().boot_time_to_monotonic_time(data.metadata.timestamp);
595    let time_nanos = time.into_nanos();
596    let time_secs = time_nanos / NANOS_PER_SECOND;
597    // Microsecond-level precision fractional time.
598    let time_fract = (time_nanos % NANOS_PER_SECOND) / MICROS_PER_NANOSECOND;
599    let component_name = data.component_name();
600    write!(&mut result, "<{level}>[{time_secs:05}.{time_fract:06}] {component_name}",)?;
601
602    match data.metadata.pid {
603        Some(pid) => write!(&mut result, "[{pid}]: ")?,
604        None => write!(&mut result, ": ")?,
605    }
606
607    result.append(&mut formatted_tags);
608
609    if let Some(msg) = msg_after_level {
610        write!(&mut result, "{}", String::from_utf8_lossy(msg))?;
611    } else if let Some(msg) = data.msg() {
612        write!(&mut result, "{msg}")?;
613    }
614
615    for kvp in data.payload_keys_strings() {
616        write!(&mut result, " {kvp}")?;
617    }
618    write!(&mut result, "\n")?;
619    Ok(Some(result))
620}
621
622#[cfg(test)]
623mod tests {
624    use super::*;
625
626    #[test]
627    fn test_result_buffer() {
628        let mut buffer = ResultBuffer::new(100);
629        buffer.push(vec![0; 200]);
630        let result: Vec<u8> = buffer.into();
631        assert_eq!(result.len(), 100);
632
633        let mut buffer = ResultBuffer::new(100);
634        buffer.push(Vec::from_iter(0..20));
635        buffer.push(Vec::from_iter(20..50));
636        let result: Vec<u8> = buffer.into();
637        assert_eq!(result.len(), 50);
638        for i in 0..50u8 {
639            assert_eq!(result[i as usize], i);
640        }
641
642        let mut buffer = ResultBuffer::new(100);
643        buffer.push(Vec::from_iter(0..20));
644        buffer.push(Vec::from_iter(20..150));
645        let result: Vec<u8> = buffer.into();
646        assert_eq!(result.len(), 100);
647        for i in 0..100u8 {
648            assert_eq!(result[i as usize], i + 20u8);
649        }
650
651        let mut buffer = ResultBuffer::new(100);
652        buffer.push(Vec::from_iter(0..20));
653        buffer.push(Vec::from_iter(20..150));
654        buffer.push(Vec::from_iter(150..210));
655        let result: Vec<u8> = buffer.into();
656        assert_eq!(result.len(), 60);
657        for i in 0..60u8 {
658            assert_eq!(result[i as usize], i + 150u8);
659        }
660    }
661
662    #[test]
663    fn test_extract_level() {
664        for level in 0..8 {
665            let msg = format!("<{level}> some message");
666            let result = extract_level(msg.as_bytes()).map(|(x, i)| (x as u8, i));
667            assert_eq!(Some((level, " some message".as_bytes())), result);
668        }
669    }
670
671    #[test]
672    fn parse_message_accepts_levels_greater_than_7() {
673        assert_eq!(
674            Some((KmsgLevel::Warning, " message".as_bytes())),
675            extract_level("<100> message".as_bytes())
676        );
677    }
678
679    #[test]
680    fn parse_message_defaults_when_non_numbers() {
681        assert_eq!(None, extract_level("<a> some message".as_bytes()));
682    }
683
684    #[test]
685    fn parse_message_defaults_when_invalid_level_syntax() {
686        assert_eq!(None, extract_level("<1 some message".as_bytes()));
687    }
688
689    #[test]
690    fn parse_message_defaults_when_no_level() {
691        assert_eq!(None, extract_level("some message".as_bytes()));
692    }
693}