Skip to main content

starnix_core/task/
syslog.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::security;
6use crate::task::dynamic_thread_spawner::SpawnRequestBuilder;
7use crate::task::{
8    CurrentTask, EventHandler, ThreadLockupDetector, WaitCallback, WaitCanceler, WaitQueue, Waiter,
9};
10use crate::vfs::OutputBuffer;
11use diagnostics_data::{Data, Logs, LogsData, Severity};
12use estimate_timeline::{DefaultFetcher, TimeFetcher, TimelineEstimator};
13use fidl_fuchsia_diagnostics as fdiagnostics;
14use fuchsia_component::client::connect_to_protocol_sync;
15use fuchsia_inspect::Inspector;
16use futures::FutureExt;
17use serde::Deserialize;
18use starnix_sync::{LockDepMutex, Locked, Mutex, SyslogStateLock, Unlocked};
19use starnix_uapi::auth::CAP_SYSLOG;
20use starnix_uapi::errors::{EAGAIN, Errno, errno, error};
21use starnix_uapi::syslog::SyslogAction;
22use starnix_uapi::vfs::FdEvents;
23use std::cmp;
24use std::collections::VecDeque;
25use std::io::{self, Write};
26use std::sync::atomic::Ordering;
27use std::sync::{Arc, OnceLock, mpsc};
28use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, TryFromBytes};
29
30const BUFFER_SIZE: i32 = 1_049_000;
31
32const NANOS_PER_SECOND: i64 = 1_000_000_000;
33const MICROS_PER_NANOSECOND: i64 = 1_000;
34
35#[derive(Default)]
36pub struct Syslog {
37    syscall_subscription: OnceLock<Mutex<LogSubscription>>,
38    state: Arc<LockDepMutex<TimelineEstimator<DefaultFetcher>, SyslogStateLock>>,
39}
40
41#[derive(Debug)]
42pub enum SyslogAccess {
43    DevKmsgRead,
44    ProcKmsg(SyslogAction),
45    Syscall(SyslogAction),
46}
47
48impl Syslog {
49    pub fn init(&self, system_task: &CurrentTask) -> Result<(), anyhow::Error> {
50        let state = self.state.clone();
51        system_task.kernel.inspect_node.record_lazy_child("syslog", move || {
52            let state = state.clone();
53            async move {
54                let inspector = Inspector::default();
55                let state_guard = state.lock();
56                inspector.root().record_uint("max_timeline_size", state_guard.max_timeline_size());
57                inspector
58                    .root()
59                    .record_uint("timeline_overflows", state_guard.timeline_overflows());
60                Ok(inspector)
61            }
62            .boxed()
63        });
64
65        let subscription = LogSubscription::snapshot_then_subscribe(system_task)?;
66        self.syscall_subscription.set(Mutex::new(subscription)).expect("syslog inititialized once");
67        Ok(())
68    }
69
70    pub fn access(
71        &self,
72        current_task: &CurrentTask,
73        access: SyslogAccess,
74    ) -> Result<GrantedSyslog<'_>, Errno> {
75        Self::validate_access(current_task, access)?;
76        let syscall_subscription = self.subscription()?;
77        Ok(GrantedSyslog { syscall_subscription })
78    }
79
80    /// Validates that syslog access is unrestricted, or that the `current_task` has the relevant
81    /// capability, and applies the SELinux policy.
82    pub fn validate_access(current_task: &CurrentTask, access: SyslogAccess) -> Result<(), Errno> {
83        let (action, check_capabilities) = match access {
84            SyslogAccess::ProcKmsg(SyslogAction::Open) => (SyslogAction::Open, true),
85            SyslogAccess::DevKmsgRead => (SyslogAction::ReadAll, true),
86            SyslogAccess::Syscall(a) => (a, true),
87            // If we got here we already validated Open on /proc/kmsg.
88            SyslogAccess::ProcKmsg(a) => (a, false),
89        };
90
91        // According to syslog(2) man, ReadAll (3) and SizeBuffer (10) are allowed unprivileged
92        // access only if restrict_dmsg is 0.
93        let action_is_privileged = !matches!(
94            access,
95            SyslogAccess::Syscall(SyslogAction::ReadAll | SyslogAction::SizeBuffer)
96                | SyslogAccess::DevKmsgRead,
97        );
98        let restrict_dmesg = current_task.kernel().restrict_dmesg.load(Ordering::Relaxed);
99        if check_capabilities && (action_is_privileged || restrict_dmesg) {
100            security::check_task_capable(current_task, CAP_SYSLOG)?;
101        }
102
103        security::check_syslog_access(current_task, action)?;
104        Ok(())
105    }
106
107    pub fn snapshot_then_subscribe(current_task: &CurrentTask) -> Result<LogSubscription, Errno> {
108        LogSubscription::snapshot_then_subscribe(current_task)
109    }
110
111    pub fn subscribe(current_task: &CurrentTask) -> Result<LogSubscription, Errno> {
112        LogSubscription::subscribe(current_task)
113    }
114
115    fn subscription(&self) -> Result<&Mutex<LogSubscription>, Errno> {
116        self.syscall_subscription.get().ok_or_else(|| errno!(ENOENT))
117    }
118}
119
120pub struct GrantedSyslog<'a> {
121    syscall_subscription: &'a Mutex<LogSubscription>,
122}
123
124impl GrantedSyslog<'_> {
125    pub fn read(&self, out: &mut dyn OutputBuffer) -> Result<i32, Errno> {
126        let mut subscription = self.syscall_subscription.lock();
127        if let Some(log) = subscription.try_next()? {
128            let size_to_write = cmp::min(log.len(), out.available() as usize);
129            out.write(&log[..size_to_write])?;
130            return Ok(size_to_write as i32);
131        }
132        Ok(0)
133    }
134
135    pub fn wait(&self, waiter: &Waiter, events: FdEvents, handler: EventHandler) -> WaitCanceler {
136        self.syscall_subscription.lock().wait(waiter, events, handler)
137    }
138
139    pub fn blocking_read(
140        &self,
141        locked: &mut Locked<Unlocked>,
142        current_task: &CurrentTask,
143        out: &mut dyn OutputBuffer,
144    ) -> Result<i32, Errno> {
145        let mut subscription = self.syscall_subscription.lock();
146        let mut write_log = |log: Vec<u8>| {
147            let size_to_write = cmp::min(log.len(), out.available() as usize);
148            out.write(&log[..size_to_write])?;
149            Ok(size_to_write as i32)
150        };
151        match subscription.try_next() {
152            Err(errno) if errno == EAGAIN => {}
153            Err(errno) => return Err(errno),
154            Ok(Some(log)) => return write_log(log),
155            Ok(None) => return Ok(0),
156        }
157        let waiter = Waiter::new();
158        loop {
159            let _w = subscription.wait(
160                &waiter,
161                FdEvents::POLLIN | FdEvents::POLLHUP,
162                WaitCallback::none(),
163            );
164            match subscription.try_next() {
165                Err(errno) if errno == EAGAIN => {}
166                Err(errno) => return Err(errno),
167                Ok(Some(log)) => return write_log(log),
168                Ok(None) => return Ok(0),
169            }
170            waiter.wait(locked, current_task)?;
171        }
172    }
173
174    pub fn read_all(
175        &self,
176        current_task: &CurrentTask,
177        out: &mut dyn OutputBuffer,
178    ) -> Result<i32, Errno> {
179        let mut subscription = LogSubscription::snapshot(current_task)?;
180        let mut buffer = ResultBuffer::new(out.available());
181        while let Some(log_result) = subscription.next() {
182            buffer.push(log_result?);
183        }
184        let result: Vec<u8> = buffer.into();
185        out.write(result.as_slice())?;
186        Ok(result.len() as i32)
187    }
188
189    pub fn size_unread(&self) -> Result<i32, Errno> {
190        let mut subscription = self.syscall_subscription.lock();
191        Ok(subscription.available()?.try_into().unwrap_or(std::i32::MAX))
192    }
193
194    pub fn size_buffer(&self) -> Result<i32, Errno> {
195        // For now always return a constant for this.
196        Ok(BUFFER_SIZE)
197    }
198}
199
200#[derive(Debug)]
201pub struct LogSubscription {
202    pending: Option<Vec<u8>>,
203    receiver: mpsc::Receiver<Result<Vec<u8>, Errno>>,
204    waiters: Arc<WaitQueue>,
205}
206
207#[derive(Debug, Deserialize)]
208#[serde(untagged)]
209enum OneOrMany<T> {
210    Many(Vec<T>),
211    One(T),
212}
213
214impl LogSubscription {
215    pub fn wait(&self, waiter: &Waiter, events: FdEvents, handler: EventHandler) -> WaitCanceler {
216        self.waiters.wait_async_fd_events(waiter, events, handler)
217    }
218
219    pub fn available(&mut self) -> Result<usize, Errno> {
220        if let Some(log) = &self.pending {
221            return Ok(log.len());
222        }
223        match self.try_next() {
224            Err(err) if err == EAGAIN => Ok(0),
225            Err(err) => Err(err),
226            Ok(Some(log)) => {
227                let size = log.len();
228                self.pending.replace(log);
229                return Ok(size);
230            }
231            Ok(None) => Ok(0),
232        }
233    }
234
235    fn snapshot(current_task: &CurrentTask) -> Result<LogIterator, Errno> {
236        LogIterator::new(&current_task.kernel.syslog, fdiagnostics::StreamMode::Snapshot)
237    }
238
239    fn subscribe(current_task: &CurrentTask) -> Result<Self, Errno> {
240        Self::new_listening(current_task, fdiagnostics::StreamMode::Subscribe)
241    }
242
243    fn snapshot_then_subscribe(current_task: &CurrentTask) -> Result<Self, Errno> {
244        Self::new_listening(current_task, fdiagnostics::StreamMode::SnapshotThenSubscribe)
245    }
246
247    fn new_listening(
248        current_task: &CurrentTask,
249        mode: fdiagnostics::StreamMode,
250    ) -> Result<Self, Errno> {
251        let iterator = LogIterator::new(&current_task.kernel.syslog, mode)?;
252        let (snd, receiver) = mpsc::sync_channel(1);
253        let waiters = Arc::new(WaitQueue::default());
254        let waiters_clone = waiters.clone();
255        let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {
256            scopeguard::defer! {
257                waiters_clone.notify_fd_events(FdEvents::POLLHUP);
258            };
259            for log in iterator {
260                if snd.send(log).is_err() {
261                    break;
262                };
263                waiters_clone.notify_fd_events(FdEvents::POLLIN);
264            }
265        };
266        let req = SpawnRequestBuilder::new()
267            .with_debug_name("syslog-listener")
268            .with_sync_closure(closure)
269            .build();
270        current_task.kernel().kthreads.spawner().spawn_from_request(req);
271
272        Ok(Self { receiver, waiters, pending: Default::default() })
273    }
274
275    fn try_next(&mut self) -> Result<Option<Vec<u8>>, Errno> {
276        if let Some(value) = self.pending.take() {
277            return Ok(Some(value));
278        }
279        match self.receiver.try_recv() {
280            // We got the next log.
281            Ok(Ok(log)) => Ok(Some(log)),
282            // An error happened attempting to get the next log.
283            Ok(Err(err)) => Err(err),
284            // The channel was closed and there's no more messages in the queue.
285            Err(mpsc::TryRecvError::Disconnected) => Ok(None),
286            // No messages available but the channel hasn't closed.
287            Err(mpsc::TryRecvError::Empty) => error!(EAGAIN),
288        }
289    }
290}
291
292struct LogIterator {
293    iterator: fdiagnostics::BatchIteratorSynchronousProxy,
294    pending_formatted_contents: VecDeque<fdiagnostics::FormattedContent>,
295    pending_datas: VecDeque<Data<Logs>>,
296    state: Arc<LockDepMutex<TimelineEstimator<DefaultFetcher>, SyslogStateLock>>,
297    tags: std::collections::HashMap<u64, diagnostics_message::MonikerWithUrl>,
298}
299
300impl LogIterator {
301    fn new(syslog: &Syslog, mode: fdiagnostics::StreamMode) -> Result<Self, Errno> {
302        let accessor = connect_to_protocol_sync::<fdiagnostics::ArchiveAccessorMarker>()
303            .map_err(|_| errno!(ENOENT, format!("Failed to connecto to ArchiveAccessor")))?;
304        let is_subscribe = matches!(mode, fdiagnostics::StreamMode::Subscribe);
305        let stream_parameters = fdiagnostics::StreamParameters {
306            stream_mode: Some(mode),
307            data_type: Some(fdiagnostics::DataType::Logs),
308            format: Some(fdiagnostics::Format::Fxt),
309            client_selector_configuration: Some(
310                fdiagnostics::ClientSelectorConfiguration::SelectAll(true),
311            ),
312            ..fdiagnostics::StreamParameters::default()
313        };
314        let (client_end, server_end) =
315            fidl::endpoints::create_endpoints::<fdiagnostics::BatchIteratorMarker>();
316        accessor.stream_diagnostics(&stream_parameters, server_end).map_err(|err| {
317            errno!(EIO, format!("ArchiveAccessor/StreamDiagnostics failed: {err}"))
318        })?;
319        let iterator = fdiagnostics::BatchIteratorSynchronousProxy::new(client_end.into_channel());
320        if is_subscribe {
321            let () = iterator.wait_for_ready(zx::MonotonicInstant::INFINITE).map_err(|err| {
322                errno!(EIO, format!("Failed to wait for BatchIterator being ready: {err}"))
323            })?;
324        }
325        Ok(Self {
326            iterator,
327            pending_formatted_contents: VecDeque::new(),
328            pending_datas: VecDeque::new(),
329            state: syslog.state.clone(),
330            tags: std::collections::HashMap::new(),
331        })
332    }
333
334    // TODO(b/315520045): Investigate if we should make this
335    // not allocate anything.
336    fn get_next(&mut self) -> Result<Option<Vec<u8>>, Errno> {
337        'main_loop: loop {
338            while let Some(data) = self.pending_datas.pop_front() {
339                if let Some(log) = format_log(data, &self.state).map_err(|_| errno!(EIO))? {
340                    return Ok(Some(log));
341                }
342            }
343            while let Some(formatted_content) = self.pending_formatted_contents.pop_front() {
344                let output: OneOrMany<Data<Logs>> = match formatted_content {
345                    fdiagnostics::FormattedContent::Fxt(data) => {
346                        let buf = data
347                            .read_to_vec(
348                                0,
349                                data.get_content_size().map_err(|a| {
350                                    errno!(EIO, format!("Error {a} getting VMO size"))
351                                })?,
352                            )
353                            .map_err(|err| {
354                                errno!(EIO, format!("failed to read logs vmo: {err}"))
355                            })?;
356                        let mut current_slice = buf.as_ref();
357                        let mut ret: Option<OneOrMany<LogsData>> = None;
358                        loop {
359                            let (record, remaining) =
360                                diagnostics_log_encoding::parse::parse_record(current_slice)
361                                    .map_err(|a| errno!(EIO, format!("Error {a} parsing FXT")))?;
362
363                            let record_len = current_slice.len() - remaining.len();
364                            let record_bytes = &current_slice[..record_len];
365
366                            let header = diagnostics_log_encoding::Header::read_from_bytes(
367                                &current_slice[..8],
368                            )
369                            .map_err(|_| errno!(EIO, "Invalid FXT header"))?;
370                            let tag = header.tag();
371                            let is_manifest =
372                                (tag & diagnostics_log_encoding::LOG_CONTROL_BIT) != 0;
373                            let actual_tag = tag & !diagnostics_log_encoding::LOG_CONTROL_BIT;
374
375                            if is_manifest {
376                                let mut moniker = None;
377                                let mut url = None;
378                                for arg in &record.arguments {
379                                    use diagnostics_log_encoding::Value;
380                                    if arg.name() == "moniker" {
381                                        if let Value::Text(t) = arg.value() {
382                                            moniker = Some(diagnostics_data::ExtendedMoniker::parse_str(&t).unwrap_or_else(|_| diagnostics_data::ExtendedMoniker::ComponentInstance(moniker::Moniker::parse_str("unknown").unwrap())));
383                                        }
384                                    } else if arg.name() == "url" {
385                                        if let Value::Text(t) = arg.value() {
386                                            url = Some(flyweights::FlyStr::new(t));
387                                        }
388                                    }
389                                }
390                                if let (Some(moniker), Some(url)) = (moniker, url) {
391                                    self.tags.insert(
392                                        actual_tag as u64,
393                                        diagnostics_message::MonikerWithUrl { moniker, url },
394                                    );
395                                }
396                            } else {
397                                let source = self
398                                    .tags
399                                    .get(&(actual_tag as u64))
400                                    .cloned()
401                                    .unwrap_or_else(|| diagnostics_message::MonikerWithUrl {
402                                        moniker:
403                                            diagnostics_data::ExtendedMoniker::ComponentInstance(
404                                                moniker::Moniker::parse_str("unknown").unwrap(),
405                                            ),
406                                        url: flyweights::FlyStr::new("unknown"),
407                                    });
408
409                                let data =
410                                    diagnostics_message::from_structured(source, record_bytes)
411                                        .map_err(|a| {
412                                            errno!(EIO, format!("Error {a} parsing FXT"))
413                                        })?;
414
415                                ret = Some(match ret.take() {
416                                    Some(OneOrMany::One(one)) => OneOrMany::Many(vec![one, data]),
417                                    Some(OneOrMany::Many(mut many)) => {
418                                        many.push(data);
419                                        OneOrMany::Many(many)
420                                    }
421                                    None => OneOrMany::One(data),
422                                });
423                            }
424
425                            if remaining.is_empty() {
426                                break;
427                            }
428                            current_slice = remaining;
429                        }
430                        ret.unwrap_or_else(|| OneOrMany::Many(vec![]))
431                    }
432                    format => {
433                        unreachable!("we only request and expect one format. Got: {format:?}")
434                    }
435                };
436                match output {
437                    OneOrMany::One(data) => {
438                        if let Some(log) = format_log(data, &self.state).map_err(|_| errno!(EIO))? {
439                            return Ok(Some(log));
440                        }
441                    }
442                    OneOrMany::Many(datas) => {
443                        if datas.len() > 0 {
444                            self.pending_datas.extend(datas);
445                            continue 'main_loop;
446                        }
447                    }
448                }
449            }
450            let next_batch = {
451                let _waiting_guard = ThreadLockupDetector::pause_tracking();
452                self.iterator
453                    .get_next(zx::MonotonicInstant::INFINITE)
454                    .map_err(|_| errno!(ENOENT))?
455                    .map_err(|_| errno!(ENOENT))?
456            };
457            if next_batch.is_empty() {
458                return Ok(None);
459            }
460            self.pending_formatted_contents = VecDeque::from(next_batch);
461        }
462    }
463}
464
465impl Iterator for LogIterator {
466    type Item = Result<Vec<u8>, Errno>;
467
468    fn next(&mut self) -> Option<Result<Vec<u8>, Errno>> {
469        self.get_next().transpose()
470    }
471}
472
473impl Iterator for LogSubscription {
474    type Item = Result<Vec<u8>, Errno>;
475
476    fn next(&mut self) -> Option<Self::Item> {
477        self.try_next().transpose()
478    }
479}
480
481struct ResultBuffer {
482    max_size: usize,
483    buffer: VecDeque<Vec<u8>>,
484    current_size: usize,
485}
486
487impl ResultBuffer {
488    fn new(max_size: usize) -> Self {
489        Self { max_size, buffer: VecDeque::default(), current_size: 0 }
490    }
491
492    fn push(&mut self, data: Vec<u8>) {
493        while !self.buffer.is_empty() && self.current_size + data.len() > self.max_size {
494            let old = self.buffer.pop_front().unwrap();
495            self.current_size -= old.len();
496        }
497        self.current_size += data.len();
498        self.buffer.push_back(data);
499    }
500}
501
502impl Into<Vec<u8>> for ResultBuffer {
503    fn into(self) -> Vec<u8> {
504        let mut result = Vec::with_capacity(self.current_size);
505        for mut item in self.buffer {
506            result.append(&mut item);
507        }
508        // If we still exceed the size (for example, a single message of size N in a buffer of
509        // size M when N>M), we trim the output.
510        result.truncate(self.max_size);
511        result
512    }
513}
514
515#[derive(Debug, Eq, PartialEq, Copy, Clone, KnownLayout, TryFromBytes, Immutable, IntoBytes)]
516#[repr(u8)]
517pub enum KmsgLevel {
518    Emergency = 0,
519    Alert = 1,
520    Critical = 2,
521    Error = 3,
522    Warning = 4,
523    Notice = 5,
524    Info = 6,
525    Debug = 7,
526}
527
528impl KmsgLevel {
529    fn from_raw(value: u8) -> Option<KmsgLevel> {
530        zerocopy::try_transmute!(value).ok()
531    }
532}
533
534/// Given a string starting with <[0-9]*>, returns the level interpreted from the lower 3 bits.
535/// The next 8 is the facility, which we ignore atm.
536/// If the string doesn't start with a valid level, we return None.
537/// The slice returned is the rest of the message after the closing '>'.
538///
539/// Reference: https://www.kernel.org/doc/Documentation/ABI/testing/dev-kmsg
540pub(crate) fn extract_level(msg: &[u8]) -> Option<(KmsgLevel, &[u8])> {
541    let mut bytes_iter = msg.iter();
542    let Some(c) = bytes_iter.next() else {
543        return None;
544    };
545    if *c != b'<' {
546        return None;
547    }
548    let Some(end) = bytes_iter.enumerate().find(|(_, c)| **c == b'>').map(|(i, _)| i + 1) else {
549        return None;
550    };
551    std::str::from_utf8(&msg[1..end])
552        .ok()
553        .and_then(|s| s.parse::<u64>().ok())
554        .map(|level| (level & 0x07) as u8)
555        .and_then(KmsgLevel::from_raw)
556        .map(|level| (level, &msg[end + 1..]))
557}
558
559fn format_log<T: TimeFetcher>(
560    data: Data<Logs>,
561    state: &Arc<LockDepMutex<TimelineEstimator<T>, SyslogStateLock>>,
562) -> Result<Option<Vec<u8>>, io::Error> {
563    let mut formatted_tags = match data.tags() {
564        None => vec![],
565        Some(tags) => {
566            let mut formatted = vec![];
567            for (i, tag) in tags.iter().enumerate() {
568                // TODO(b/299533466): remove this.
569                if tag.contains("fxlogcat") {
570                    return Ok(None);
571                }
572                if i != 0 {
573                    write!(&mut formatted, ",")?;
574                }
575                write!(&mut formatted, "{tag}")?;
576            }
577            write!(&mut formatted, ": ")?;
578            formatted
579        }
580    };
581
582    let mut result = Vec::<u8>::new();
583    let (level, msg_after_level) = match data.msg().and_then(|msg| extract_level(msg.as_bytes())) {
584        Some((level, remaining_msg)) => (level as u8, Some(remaining_msg)),
585        None => match data.severity() {
586            Severity::Trace | Severity::Debug => (KmsgLevel::Debug as u8, None),
587            Severity::Info => (KmsgLevel::Info as u8, None),
588            Severity::Warn => (KmsgLevel::Warning as u8, None),
589            Severity::Error => (KmsgLevel::Error as u8, None),
590            Severity::Fatal => (KmsgLevel::Critical as u8, None),
591        },
592    };
593
594    // TODO(https://fxbug.dev/433724019): this isn't correct strictly speaking, but will be in most
595    // cases. We unapply the *current* offset and in the case where suspension happened between
596    // when the log message was generated and when Starnix is forwarding the log message, this will
597    // be different from the *actual* offset prior to suspension.
598    let time = state.lock().boot_time_to_monotonic_time(data.metadata.timestamp);
599    let time_nanos = time.into_nanos();
600    let time_secs = time_nanos / NANOS_PER_SECOND;
601    // Microsecond-level precision fractional time.
602    let time_fract = (time_nanos % NANOS_PER_SECOND) / MICROS_PER_NANOSECOND;
603    let component_name = data.component_name();
604    write!(&mut result, "<{level}>[{time_secs:05}.{time_fract:06}] {component_name}",)?;
605
606    match data.metadata.pid {
607        Some(pid) => write!(&mut result, "[{pid}]: ")?,
608        None => write!(&mut result, ": ")?,
609    }
610
611    result.append(&mut formatted_tags);
612
613    if let Some(msg) = msg_after_level {
614        write!(&mut result, "{}", String::from_utf8_lossy(msg))?;
615    } else if let Some(msg) = data.msg() {
616        write!(&mut result, "{msg}")?;
617    }
618
619    for kvp in data.payload_keys_strings() {
620        write!(&mut result, " {kvp}")?;
621    }
622    write!(&mut result, "\n")?;
623    Ok(Some(result))
624}
625
626#[cfg(test)]
627mod tests {
628    use super::*;
629
630    #[test]
631    fn test_result_buffer() {
632        let mut buffer = ResultBuffer::new(100);
633        buffer.push(vec![0; 200]);
634        let result: Vec<u8> = buffer.into();
635        assert_eq!(result.len(), 100);
636
637        let mut buffer = ResultBuffer::new(100);
638        buffer.push(Vec::from_iter(0..20));
639        buffer.push(Vec::from_iter(20..50));
640        let result: Vec<u8> = buffer.into();
641        assert_eq!(result.len(), 50);
642        for i in 0..50u8 {
643            assert_eq!(result[i as usize], i);
644        }
645
646        let mut buffer = ResultBuffer::new(100);
647        buffer.push(Vec::from_iter(0..20));
648        buffer.push(Vec::from_iter(20..150));
649        let result: Vec<u8> = buffer.into();
650        assert_eq!(result.len(), 100);
651        for i in 0..100u8 {
652            assert_eq!(result[i as usize], i + 20u8);
653        }
654
655        let mut buffer = ResultBuffer::new(100);
656        buffer.push(Vec::from_iter(0..20));
657        buffer.push(Vec::from_iter(20..150));
658        buffer.push(Vec::from_iter(150..210));
659        let result: Vec<u8> = buffer.into();
660        assert_eq!(result.len(), 60);
661        for i in 0..60u8 {
662            assert_eq!(result[i as usize], i + 150u8);
663        }
664    }
665
666    #[test]
667    fn test_extract_level() {
668        for level in 0..8 {
669            let msg = format!("<{level}> some message");
670            let result = extract_level(msg.as_bytes()).map(|(x, i)| (x as u8, i));
671            assert_eq!(Some((level, " some message".as_bytes())), result);
672        }
673    }
674
675    #[test]
676    fn parse_message_accepts_levels_greater_than_7() {
677        assert_eq!(
678            Some((KmsgLevel::Warning, " message".as_bytes())),
679            extract_level("<100> message".as_bytes())
680        );
681    }
682
683    #[test]
684    fn parse_message_defaults_when_non_numbers() {
685        assert_eq!(None, extract_level("<a> some message".as_bytes()));
686    }
687
688    #[test]
689    fn parse_message_defaults_when_invalid_level_syntax() {
690        assert_eq!(None, extract_level("<1 some message".as_bytes()));
691    }
692
693    #[test]
694    fn parse_message_defaults_when_no_level() {
695        assert_eq!(None, extract_level("some message".as_bytes()));
696    }
697}