starnix_core/task/
syslog.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::security;
6use crate::task::dynamic_thread_spawner::SpawnRequestBuilder;
7use crate::task::{CurrentTask, EventHandler, WaitCallback, WaitCanceler, WaitQueue, Waiter};
8use crate::vfs::OutputBuffer;
9use diagnostics_data::{Data, Logs, LogsData, Severity};
10use diagnostics_message::from_extended_record;
11use estimate_timeline::{DefaultFetcher, TimeFetcher, TimelineEstimator};
12use fidl_fuchsia_diagnostics as fdiagnostics;
13use fuchsia_component::client::connect_to_protocol_sync;
14use fuchsia_inspect::Inspector;
15use futures::FutureExt;
16use serde::Deserialize;
17use starnix_sync::{Locked, Mutex, Unlocked};
18use starnix_uapi::auth::CAP_SYSLOG;
19use starnix_uapi::errors::{EAGAIN, Errno, errno, error};
20use starnix_uapi::syslog::SyslogAction;
21use starnix_uapi::vfs::FdEvents;
22use std::cmp;
23use std::collections::VecDeque;
24use std::io::{self, Write};
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, OnceLock, mpsc};
27use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes};
28
29const BUFFER_SIZE: i32 = 1_049_000;
30
31const NANOS_PER_SECOND: i64 = 1_000_000_000;
32const MICROS_PER_NANOSECOND: i64 = 1_000;
33
34#[derive(Default)]
35pub struct Syslog {
36    syscall_subscription: OnceLock<Mutex<LogSubscription>>,
37    state: Arc<Mutex<TimelineEstimator<DefaultFetcher>>>,
38}
39
40#[derive(Debug)]
41pub enum SyslogAccess {
42    DevKmsgRead,
43    ProcKmsg(SyslogAction),
44    Syscall(SyslogAction),
45}
46
47impl Syslog {
48    pub fn init(&self, system_task: &CurrentTask) -> Result<(), anyhow::Error> {
49        let state = self.state.clone();
50        system_task.kernel.inspect_node.record_lazy_child("syslog", move || {
51            let state = state.clone();
52            async move {
53                let inspector = Inspector::default();
54                let state_guard = state.lock();
55                inspector.root().record_uint("max_timeline_size", state_guard.max_timeline_size());
56                inspector
57                    .root()
58                    .record_uint("timeline_overflows", state_guard.timeline_overflows());
59                Ok(inspector)
60            }
61            .boxed()
62        });
63
64        let subscription = LogSubscription::snapshot_then_subscribe(system_task)?;
65        self.syscall_subscription.set(Mutex::new(subscription)).expect("syslog inititialized once");
66        Ok(())
67    }
68
69    pub fn access(
70        &self,
71        current_task: &CurrentTask,
72        access: SyslogAccess,
73    ) -> Result<GrantedSyslog<'_>, Errno> {
74        Self::validate_access(current_task, access)?;
75        let syscall_subscription = self.subscription()?;
76        Ok(GrantedSyslog { syscall_subscription })
77    }
78
79    /// Validates that syslog access is unrestricted, or that the `current_task` has the relevant
80    /// capability, and applies the SELinux policy.
81    pub fn validate_access(current_task: &CurrentTask, access: SyslogAccess) -> Result<(), Errno> {
82        let (action, check_capabilities) = match access {
83            SyslogAccess::ProcKmsg(SyslogAction::Open) => (SyslogAction::Open, true),
84            SyslogAccess::DevKmsgRead => (SyslogAction::ReadAll, true),
85            SyslogAccess::Syscall(a) => (a, true),
86            // If we got here we already validated Open on /proc/kmsg.
87            SyslogAccess::ProcKmsg(a) => (a, false),
88        };
89
90        // According to syslog(2) man, ReadAll (3) and SizeBuffer (10) are allowed unprivileged
91        // access only if restrict_dmsg is 0.
92        let action_is_privileged = !matches!(
93            access,
94            SyslogAccess::Syscall(SyslogAction::ReadAll | SyslogAction::SizeBuffer)
95                | SyslogAccess::DevKmsgRead,
96        );
97        let restrict_dmesg = current_task.kernel().restrict_dmesg.load(Ordering::Relaxed);
98        if check_capabilities && (action_is_privileged || restrict_dmesg) {
99            security::check_task_capable(current_task, CAP_SYSLOG)?;
100        }
101
102        security::check_syslog_access(current_task, action)?;
103        Ok(())
104    }
105
106    pub fn snapshot_then_subscribe(current_task: &CurrentTask) -> Result<LogSubscription, Errno> {
107        LogSubscription::snapshot_then_subscribe(current_task)
108    }
109
110    pub fn subscribe(current_task: &CurrentTask) -> Result<LogSubscription, Errno> {
111        LogSubscription::subscribe(current_task)
112    }
113
114    fn subscription(&self) -> Result<&Mutex<LogSubscription>, Errno> {
115        self.syscall_subscription.get().ok_or_else(|| errno!(ENOENT))
116    }
117}
118
119pub struct GrantedSyslog<'a> {
120    syscall_subscription: &'a Mutex<LogSubscription>,
121}
122
123impl GrantedSyslog<'_> {
124    pub fn read(&self, out: &mut dyn OutputBuffer) -> Result<i32, Errno> {
125        let mut subscription = self.syscall_subscription.lock();
126        if let Some(log) = subscription.try_next()? {
127            let size_to_write = cmp::min(log.len(), out.available() as usize);
128            out.write(&log[..size_to_write])?;
129            return Ok(size_to_write as i32);
130        }
131        Ok(0)
132    }
133
134    pub fn wait(&self, waiter: &Waiter, events: FdEvents, handler: EventHandler) -> WaitCanceler {
135        self.syscall_subscription.lock().wait(waiter, events, handler)
136    }
137
138    pub fn blocking_read(
139        &self,
140        locked: &mut Locked<Unlocked>,
141        current_task: &CurrentTask,
142        out: &mut dyn OutputBuffer,
143    ) -> Result<i32, Errno> {
144        let mut subscription = self.syscall_subscription.lock();
145        let mut write_log = |log: Vec<u8>| {
146            let size_to_write = cmp::min(log.len(), out.available() as usize);
147            out.write(&log[..size_to_write])?;
148            Ok(size_to_write as i32)
149        };
150        match subscription.try_next() {
151            Err(errno) if errno == EAGAIN => {}
152            Err(errno) => return Err(errno),
153            Ok(Some(log)) => return write_log(log),
154            Ok(None) => return Ok(0),
155        }
156        let waiter = Waiter::new();
157        loop {
158            let _w = subscription.wait(
159                &waiter,
160                FdEvents::POLLIN | FdEvents::POLLHUP,
161                WaitCallback::none(),
162            );
163            match subscription.try_next() {
164                Err(errno) if errno == EAGAIN => {}
165                Err(errno) => return Err(errno),
166                Ok(Some(log)) => return write_log(log),
167                Ok(None) => return Ok(0),
168            }
169            waiter.wait(locked, current_task)?;
170        }
171    }
172
173    pub fn read_all(
174        &self,
175        current_task: &CurrentTask,
176        out: &mut dyn OutputBuffer,
177    ) -> Result<i32, Errno> {
178        let mut subscription = LogSubscription::snapshot(current_task)?;
179        let mut buffer = ResultBuffer::new(out.available());
180        while let Some(log_result) = subscription.next() {
181            buffer.push(log_result?);
182        }
183        let result: Vec<u8> = buffer.into();
184        out.write(result.as_slice())?;
185        Ok(result.len() as i32)
186    }
187
188    pub fn size_unread(&self) -> Result<i32, Errno> {
189        let mut subscription = self.syscall_subscription.lock();
190        Ok(subscription.available()?.try_into().unwrap_or(std::i32::MAX))
191    }
192
193    pub fn size_buffer(&self) -> Result<i32, Errno> {
194        // For now always return a constant for this.
195        Ok(BUFFER_SIZE)
196    }
197}
198
199#[derive(Debug)]
200pub struct LogSubscription {
201    pending: Option<Vec<u8>>,
202    receiver: mpsc::Receiver<Result<Vec<u8>, Errno>>,
203    waiters: Arc<WaitQueue>,
204}
205
206#[derive(Debug, Deserialize)]
207#[serde(untagged)]
208enum OneOrMany<T> {
209    Many(Vec<T>),
210    One(T),
211}
212
213impl LogSubscription {
214    pub fn wait(&self, waiter: &Waiter, events: FdEvents, handler: EventHandler) -> WaitCanceler {
215        self.waiters.wait_async_fd_events(waiter, events, handler)
216    }
217
218    pub fn available(&mut self) -> Result<usize, Errno> {
219        if let Some(log) = &self.pending {
220            return Ok(log.len());
221        }
222        match self.try_next() {
223            Err(err) if err == EAGAIN => Ok(0),
224            Err(err) => Err(err),
225            Ok(Some(log)) => {
226                let size = log.len();
227                self.pending.replace(log);
228                return Ok(size);
229            }
230            Ok(None) => Ok(0),
231        }
232    }
233
234    fn snapshot(current_task: &CurrentTask) -> Result<LogIterator, Errno> {
235        LogIterator::new(&current_task.kernel.syslog, fdiagnostics::StreamMode::Snapshot)
236    }
237
238    fn subscribe(current_task: &CurrentTask) -> Result<Self, Errno> {
239        Self::new_listening(current_task, fdiagnostics::StreamMode::Subscribe)
240    }
241
242    fn snapshot_then_subscribe(current_task: &CurrentTask) -> Result<Self, Errno> {
243        Self::new_listening(current_task, fdiagnostics::StreamMode::SnapshotThenSubscribe)
244    }
245
246    fn new_listening(
247        current_task: &CurrentTask,
248        mode: fdiagnostics::StreamMode,
249    ) -> Result<Self, Errno> {
250        let iterator = LogIterator::new(&current_task.kernel.syslog, mode)?;
251        let (snd, receiver) = mpsc::sync_channel(1);
252        let waiters = Arc::new(WaitQueue::default());
253        let waiters_clone = waiters.clone();
254        let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {
255            scopeguard::defer! {
256                waiters_clone.notify_fd_events(FdEvents::POLLHUP);
257            };
258            for log in iterator {
259                if snd.send(log).is_err() {
260                    break;
261                };
262                waiters_clone.notify_fd_events(FdEvents::POLLIN);
263            }
264        };
265        let req = SpawnRequestBuilder::new()
266            .with_debug_name("syslog-listener")
267            .with_sync_closure(closure)
268            .build();
269        current_task.kernel().kthreads.spawner().spawn_from_request(req);
270
271        Ok(Self { receiver, waiters, pending: Default::default() })
272    }
273
274    fn try_next(&mut self) -> Result<Option<Vec<u8>>, Errno> {
275        if let Some(value) = self.pending.take() {
276            return Ok(Some(value));
277        }
278        match self.receiver.try_recv() {
279            // We got the next log.
280            Ok(Ok(log)) => Ok(Some(log)),
281            // An error happened attempting to get the next log.
282            Ok(Err(err)) => Err(err),
283            // The channel was closed and there's no more messages in the queue.
284            Err(mpsc::TryRecvError::Disconnected) => Ok(None),
285            // No messages available but the channel hasn't closed.
286            Err(mpsc::TryRecvError::Empty) => error!(EAGAIN),
287        }
288    }
289}
290
291struct LogIterator {
292    iterator: fdiagnostics::BatchIteratorSynchronousProxy,
293    pending_formatted_contents: VecDeque<fdiagnostics::FormattedContent>,
294    pending_datas: VecDeque<Data<Logs>>,
295    state: Arc<Mutex<TimelineEstimator<DefaultFetcher>>>,
296}
297
298impl LogIterator {
299    fn new(syslog: &Syslog, mode: fdiagnostics::StreamMode) -> Result<Self, Errno> {
300        let accessor = connect_to_protocol_sync::<fdiagnostics::ArchiveAccessorMarker>()
301            .map_err(|_| errno!(ENOENT, format!("Failed to connecto to ArchiveAccessor")))?;
302        let is_subscribe = matches!(mode, fdiagnostics::StreamMode::Subscribe);
303        let stream_parameters = fdiagnostics::StreamParameters {
304            stream_mode: Some(mode),
305            data_type: Some(fdiagnostics::DataType::Logs),
306            format: Some(fdiagnostics::Format::Fxt),
307            client_selector_configuration: Some(
308                fdiagnostics::ClientSelectorConfiguration::SelectAll(true),
309            ),
310            ..fdiagnostics::StreamParameters::default()
311        };
312        let (client_end, server_end) =
313            fidl::endpoints::create_endpoints::<fdiagnostics::BatchIteratorMarker>();
314        accessor.stream_diagnostics(&stream_parameters, server_end).map_err(|err| {
315            errno!(EIO, format!("ArchiveAccessor/StreamDiagnostics failed: {err}"))
316        })?;
317        let iterator = fdiagnostics::BatchIteratorSynchronousProxy::new(client_end.into_channel());
318        if is_subscribe {
319            let () = iterator.wait_for_ready(zx::MonotonicInstant::INFINITE).map_err(|err| {
320                errno!(EIO, format!("Failed to wait for BatchIterator being ready: {err}"))
321            })?;
322        }
323        Ok(Self {
324            iterator,
325            pending_formatted_contents: VecDeque::new(),
326            pending_datas: VecDeque::new(),
327            state: syslog.state.clone(),
328        })
329    }
330
331    // TODO(b/315520045): Investigate if we should make this
332    // not allocate anything.
333    fn get_next(&mut self) -> Result<Option<Vec<u8>>, Errno> {
334        'main_loop: loop {
335            while let Some(data) = self.pending_datas.pop_front() {
336                if let Some(log) = format_log(data, &self.state).map_err(|_| errno!(EIO))? {
337                    return Ok(Some(log));
338                }
339            }
340            while let Some(formatted_content) = self.pending_formatted_contents.pop_front() {
341                let output: OneOrMany<Data<Logs>> = match formatted_content {
342                    fdiagnostics::FormattedContent::Fxt(data) => {
343                        let buf = data
344                            .read_to_vec(
345                                0,
346                                data.get_content_size().map_err(|a| {
347                                    errno!(EIO, format!("Error {a} getting VMO size"))
348                                })?,
349                            )
350                            .map_err(|err| {
351                                errno!(EIO, format!("failed to read logs vmo: {err}"))
352                            })?;
353                        let mut current_slice = buf.as_ref();
354                        let mut ret: Option<OneOrMany<LogsData>> = None;
355                        loop {
356                            let (data, remaining) = from_extended_record(current_slice)
357                                .map_err(|a| errno!(EIO, format!("Error {a} parsing FXT")))?;
358                            ret = Some(match ret.take() {
359                                Some(OneOrMany::One(one)) => OneOrMany::Many(vec![one, data]),
360                                Some(OneOrMany::Many(mut many)) => {
361                                    many.push(data);
362                                    OneOrMany::Many(many)
363                                }
364                                None => OneOrMany::One(data),
365                            });
366                            if remaining.is_empty() {
367                                break;
368                            }
369                            current_slice = remaining;
370                        }
371                        ret.ok_or_else(|| errno!(EIO, format!("archivist returned invalid data")))?
372                    }
373                    format => {
374                        unreachable!("we only request and expect one format. Got: {format:?}")
375                    }
376                };
377                match output {
378                    OneOrMany::One(data) => {
379                        if let Some(log) = format_log(data, &self.state).map_err(|_| errno!(EIO))? {
380                            return Ok(Some(log));
381                        }
382                    }
383                    OneOrMany::Many(datas) => {
384                        if datas.len() > 0 {
385                            self.pending_datas.extend(datas);
386                            continue 'main_loop;
387                        }
388                    }
389                }
390            }
391            let next_batch = self
392                .iterator
393                .get_next(zx::MonotonicInstant::INFINITE)
394                .map_err(|_| errno!(ENOENT))?
395                .map_err(|_| errno!(ENOENT))?;
396            if next_batch.is_empty() {
397                return Ok(None);
398            }
399            self.pending_formatted_contents = VecDeque::from(next_batch);
400        }
401    }
402}
403
404impl Iterator for LogIterator {
405    type Item = Result<Vec<u8>, Errno>;
406
407    fn next(&mut self) -> Option<Result<Vec<u8>, Errno>> {
408        self.get_next().transpose()
409    }
410}
411
412impl Iterator for LogSubscription {
413    type Item = Result<Vec<u8>, Errno>;
414
415    fn next(&mut self) -> Option<Self::Item> {
416        self.try_next().transpose()
417    }
418}
419
420struct ResultBuffer {
421    max_size: usize,
422    buffer: VecDeque<Vec<u8>>,
423    current_size: usize,
424}
425
426impl ResultBuffer {
427    fn new(max_size: usize) -> Self {
428        Self { max_size, buffer: VecDeque::default(), current_size: 0 }
429    }
430
431    fn push(&mut self, data: Vec<u8>) {
432        while !self.buffer.is_empty() && self.current_size + data.len() > self.max_size {
433            let old = self.buffer.pop_front().unwrap();
434            self.current_size -= old.len();
435        }
436        self.current_size += data.len();
437        self.buffer.push_back(data);
438    }
439}
440
441impl Into<Vec<u8>> for ResultBuffer {
442    fn into(self) -> Vec<u8> {
443        let mut result = Vec::with_capacity(self.current_size);
444        for mut item in self.buffer {
445            result.append(&mut item);
446        }
447        // If we still exceed the size (for example, a single message of size N in a buffer of
448        // size M when N>M), we trim the output.
449        let size = std::cmp::min(result.len(), std::cmp::min(self.max_size, self.current_size));
450        if result.len() != size {
451            result.resize(size, 0);
452        }
453        result
454    }
455}
456
457#[derive(Debug, Eq, PartialEq, Copy, Clone, KnownLayout, TryFromBytes, Immutable, IntoBytes)]
458#[repr(u8)]
459pub enum KmsgLevel {
460    Emergency = 0,
461    Alert = 1,
462    Critical = 2,
463    Error = 3,
464    Warning = 4,
465    Notice = 5,
466    Info = 6,
467    Debug = 7,
468}
469
470impl KmsgLevel {
471    fn from_raw(value: u8) -> Option<KmsgLevel> {
472        zerocopy::try_transmute!(value).ok()
473    }
474}
475
476/// Given a string starting with <[0-9]*>, returns the level interpreted from the lower 3 bits.
477/// The next 8 is the facility, which we ignore atm.
478/// If the string doesn't start with a valid level, we return None.
479/// The slice returned is the rest of the message after the closing '>'.
480///
481/// Reference: https://www.kernel.org/doc/Documentation/ABI/testing/dev-kmsg
482pub(crate) fn extract_level(msg: &[u8]) -> Option<(KmsgLevel, &[u8])> {
483    let mut bytes_iter = msg.iter();
484    let Some(c) = bytes_iter.next() else {
485        return None;
486    };
487    if *c != b'<' {
488        return None;
489    }
490    let Some(end) = bytes_iter.enumerate().find(|(_, c)| **c == b'>').map(|(i, _)| i + 1) else {
491        return None;
492    };
493    std::str::from_utf8(&msg[1..end])
494        .ok()
495        .and_then(|s| s.parse::<u64>().ok())
496        .map(|level| (level & 0x07) as u8)
497        .and_then(KmsgLevel::from_raw)
498        .map(|level| (level, &msg[end + 1..]))
499}
500
501fn format_log<T: TimeFetcher>(
502    data: Data<Logs>,
503    state: &Arc<Mutex<TimelineEstimator<T>>>,
504) -> Result<Option<Vec<u8>>, io::Error> {
505    let mut formatted_tags = match data.tags() {
506        None => vec![],
507        Some(tags) => {
508            let mut formatted = vec![];
509            for (i, tag) in tags.iter().enumerate() {
510                // TODO(b/299533466): remove this.
511                if tag.contains("fxlogcat") {
512                    return Ok(None);
513                }
514                if i != 0 {
515                    write!(&mut formatted, ",")?;
516                }
517                write!(&mut formatted, "{tag}")?;
518            }
519            write!(&mut formatted, ": ")?;
520            formatted
521        }
522    };
523
524    let mut result = Vec::<u8>::new();
525    let (level, msg_after_level) = match data.msg().and_then(|msg| extract_level(msg.as_bytes())) {
526        Some((level, remaining_msg)) => (level as u8, Some(remaining_msg)),
527        None => match data.severity() {
528            Severity::Trace | Severity::Debug => (KmsgLevel::Debug as u8, None),
529            Severity::Info => (KmsgLevel::Info as u8, None),
530            Severity::Warn => (KmsgLevel::Warning as u8, None),
531            Severity::Error => (KmsgLevel::Error as u8, None),
532            Severity::Fatal => (KmsgLevel::Critical as u8, None),
533        },
534    };
535
536    // TODO(https://fxbug.dev/433724019): this isn't correct strictly speaking, but will be in most
537    // cases. We unapply the *current* offset and in the case where suspension happened between
538    // when the log message was generated and when Starnix is forwarding the log message, this will
539    // be different from the *actual* offset prior to suspension.
540    let time = state.lock().boot_time_to_monotonic_time(data.metadata.timestamp);
541    let time_nanos = time.into_nanos();
542    let time_secs = time_nanos / NANOS_PER_SECOND;
543    // Microsecond-level precision fractional time.
544    let time_fract = (time_nanos % NANOS_PER_SECOND) / MICROS_PER_NANOSECOND;
545    let component_name = data.component_name();
546    write!(&mut result, "<{level}>[{time_secs:05}.{time_fract:06}] {component_name}",)?;
547
548    match data.metadata.pid {
549        Some(pid) => write!(&mut result, "[{pid}]: ")?,
550        None => write!(&mut result, ": ")?,
551    }
552
553    result.append(&mut formatted_tags);
554
555    if let Some(msg) = msg_after_level {
556        write!(&mut result, "{}", String::from_utf8_lossy(msg))?;
557    } else if let Some(msg) = data.msg() {
558        write!(&mut result, "{msg}")?;
559    }
560
561    for kvp in data.payload_keys_strings() {
562        write!(&mut result, " {kvp}")?;
563    }
564    write!(&mut result, "\n")?;
565    Ok(Some(result))
566}
567
568#[cfg(test)]
569mod tests {
570    use super::*;
571
572    #[test]
573    fn test_result_buffer() {
574        let mut buffer = ResultBuffer::new(100);
575        buffer.push(vec![0; 200]);
576        let result: Vec<u8> = buffer.into();
577        assert_eq!(result.len(), 100);
578
579        let mut buffer = ResultBuffer::new(100);
580        buffer.push(Vec::from_iter(0..20));
581        buffer.push(Vec::from_iter(20..50));
582        let result: Vec<u8> = buffer.into();
583        assert_eq!(result.len(), 50);
584        for i in 0..50u8 {
585            assert_eq!(result[i as usize], i);
586        }
587    }
588
589    #[test]
590    fn test_extract_level() {
591        for level in 0..8 {
592            let msg = format!("<{level}> some message");
593            let result = extract_level(msg.as_bytes()).map(|(x, i)| (x as u8, i));
594            assert_eq!(Some((level, " some message".as_bytes())), result);
595        }
596    }
597
598    #[test]
599    fn parse_message_accepts_levels_greater_than_7() {
600        assert_eq!(
601            Some((KmsgLevel::Warning, " message".as_bytes())),
602            extract_level("<100> message".as_bytes())
603        );
604    }
605
606    #[test]
607    fn parse_message_defaults_when_non_numbers() {
608        assert_eq!(None, extract_level("<a> some message".as_bytes()));
609    }
610
611    #[test]
612    fn parse_message_defaults_when_invalid_level_syntax() {
613        assert_eq!(None, extract_level("<1 some message".as_bytes()));
614    }
615
616    #[test]
617    fn parse_message_defaults_when_no_level() {
618        assert_eq!(None, extract_level("some message".as_bytes()));
619    }
620}