1use crate::security;
6use crate::task::dynamic_thread_spawner::SpawnRequestBuilder;
7use crate::task::{CurrentTask, EventHandler, WaitCallback, WaitCanceler, WaitQueue, Waiter};
8use crate::vfs::OutputBuffer;
9use diagnostics_data::{Data, Logs, LogsData, Severity};
10use diagnostics_message::from_extended_record;
11use estimate_timeline::{DefaultFetcher, TimeFetcher, TimelineEstimator};
12use fidl_fuchsia_diagnostics as fdiagnostics;
13use fuchsia_component::client::connect_to_protocol_sync;
14use fuchsia_inspect::Inspector;
15use futures::FutureExt;
16use serde::Deserialize;
17use starnix_sync::{Locked, Mutex, Unlocked};
18use starnix_uapi::auth::CAP_SYSLOG;
19use starnix_uapi::errors::{EAGAIN, Errno, errno, error};
20use starnix_uapi::syslog::SyslogAction;
21use starnix_uapi::vfs::FdEvents;
22use std::cmp;
23use std::collections::VecDeque;
24use std::io::{self, Write};
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, OnceLock, mpsc};
27
28const BUFFER_SIZE: i32 = 1_049_000;
29
30const NANOS_PER_SECOND: i64 = 1_000_000_000;
31const MICROS_PER_NANOSECOND: i64 = 1_000;
32
33#[derive(Default)]
34pub struct Syslog {
35 syscall_subscription: OnceLock<Mutex<LogSubscription>>,
36 state: Arc<Mutex<TimelineEstimator<DefaultFetcher>>>,
37}
38
39#[derive(Debug)]
40pub enum SyslogAccess {
41 DevKmsgRead,
42 ProcKmsg(SyslogAction),
43 Syscall(SyslogAction),
44}
45
46impl Syslog {
47 pub fn init(&self, system_task: &CurrentTask) -> Result<(), anyhow::Error> {
48 let state = self.state.clone();
49 system_task.kernel.inspect_node.record_lazy_child("syslog", move || {
50 let state = state.clone();
51 async move {
52 let inspector = Inspector::default();
53 let state_guard = state.lock();
54 inspector.root().record_uint("max_timeline_size", state_guard.max_timeline_size());
55 inspector
56 .root()
57 .record_uint("timeline_overflows", state_guard.timeline_overflows());
58 Ok(inspector)
59 }
60 .boxed()
61 });
62
63 let subscription = LogSubscription::snapshot_then_subscribe(system_task)?;
64 self.syscall_subscription.set(Mutex::new(subscription)).expect("syslog inititialized once");
65 Ok(())
66 }
67
68 pub fn access(
69 &self,
70 current_task: &CurrentTask,
71 access: SyslogAccess,
72 ) -> Result<GrantedSyslog<'_>, Errno> {
73 Self::validate_access(current_task, access)?;
74 let syscall_subscription = self.subscription()?;
75 Ok(GrantedSyslog { syscall_subscription })
76 }
77
78 pub fn validate_access(current_task: &CurrentTask, access: SyslogAccess) -> Result<(), Errno> {
81 let (action, check_capabilities) = match access {
82 SyslogAccess::ProcKmsg(SyslogAction::Open) => (SyslogAction::Open, true),
83 SyslogAccess::DevKmsgRead => (SyslogAction::ReadAll, true),
84 SyslogAccess::Syscall(a) => (a, true),
85 SyslogAccess::ProcKmsg(a) => (a, false),
87 };
88
89 let action_is_privileged = !matches!(
92 access,
93 SyslogAccess::Syscall(SyslogAction::ReadAll | SyslogAction::SizeBuffer)
94 | SyslogAccess::DevKmsgRead,
95 );
96 let restrict_dmesg = current_task.kernel().restrict_dmesg.load(Ordering::Relaxed);
97 if check_capabilities && (action_is_privileged || restrict_dmesg) {
98 security::check_task_capable(current_task, CAP_SYSLOG)?;
99 }
100
101 security::check_syslog_access(current_task, action)?;
102 Ok(())
103 }
104
105 pub fn snapshot_then_subscribe(current_task: &CurrentTask) -> Result<LogSubscription, Errno> {
106 LogSubscription::snapshot_then_subscribe(current_task)
107 }
108
109 pub fn subscribe(current_task: &CurrentTask) -> Result<LogSubscription, Errno> {
110 LogSubscription::subscribe(current_task)
111 }
112
113 fn subscription(&self) -> Result<&Mutex<LogSubscription>, Errno> {
114 self.syscall_subscription.get().ok_or_else(|| errno!(ENOENT))
115 }
116}
117
118pub struct GrantedSyslog<'a> {
119 syscall_subscription: &'a Mutex<LogSubscription>,
120}
121
122impl GrantedSyslog<'_> {
123 pub fn read(&self, out: &mut dyn OutputBuffer) -> Result<i32, Errno> {
124 let mut subscription = self.syscall_subscription.lock();
125 if let Some(log) = subscription.try_next()? {
126 let size_to_write = cmp::min(log.len(), out.available() as usize);
127 out.write(&log[..size_to_write])?;
128 return Ok(size_to_write as i32);
129 }
130 Ok(0)
131 }
132
133 pub fn wait(&self, waiter: &Waiter, events: FdEvents, handler: EventHandler) -> WaitCanceler {
134 self.syscall_subscription.lock().wait(waiter, events, handler)
135 }
136
137 pub fn blocking_read(
138 &self,
139 locked: &mut Locked<Unlocked>,
140 current_task: &CurrentTask,
141 out: &mut dyn OutputBuffer,
142 ) -> Result<i32, Errno> {
143 let mut subscription = self.syscall_subscription.lock();
144 let mut write_log = |log: Vec<u8>| {
145 let size_to_write = cmp::min(log.len(), out.available() as usize);
146 out.write(&log[..size_to_write])?;
147 Ok(size_to_write as i32)
148 };
149 match subscription.try_next() {
150 Err(errno) if errno == EAGAIN => {}
151 Err(errno) => return Err(errno),
152 Ok(Some(log)) => return write_log(log),
153 Ok(None) => return Ok(0),
154 }
155 let waiter = Waiter::new();
156 loop {
157 let _w = subscription.wait(
158 &waiter,
159 FdEvents::POLLIN | FdEvents::POLLHUP,
160 WaitCallback::none(),
161 );
162 match subscription.try_next() {
163 Err(errno) if errno == EAGAIN => {}
164 Err(errno) => return Err(errno),
165 Ok(Some(log)) => return write_log(log),
166 Ok(None) => return Ok(0),
167 }
168 waiter.wait(locked, current_task)?;
169 }
170 }
171
172 pub fn read_all(
173 &self,
174 current_task: &CurrentTask,
175 out: &mut dyn OutputBuffer,
176 ) -> Result<i32, Errno> {
177 let mut subscription = LogSubscription::snapshot(current_task)?;
178 let mut buffer = ResultBuffer::new(out.available());
179 while let Some(log_result) = subscription.next() {
180 buffer.push(log_result?);
181 }
182 let result: Vec<u8> = buffer.into();
183 out.write(result.as_slice())?;
184 Ok(result.len() as i32)
185 }
186
187 pub fn size_unread(&self) -> Result<i32, Errno> {
188 let mut subscription = self.syscall_subscription.lock();
189 Ok(subscription.available()?.try_into().unwrap_or(std::i32::MAX))
190 }
191
192 pub fn size_buffer(&self) -> Result<i32, Errno> {
193 Ok(BUFFER_SIZE)
195 }
196}
197
198#[derive(Debug)]
199pub struct LogSubscription {
200 pending: Option<Vec<u8>>,
201 receiver: mpsc::Receiver<Result<Vec<u8>, Errno>>,
202 waiters: Arc<WaitQueue>,
203}
204
205#[derive(Debug, Deserialize)]
206#[serde(untagged)]
207enum OneOrMany<T> {
208 Many(Vec<T>),
209 One(T),
210}
211
212impl LogSubscription {
213 pub fn wait(&self, waiter: &Waiter, events: FdEvents, handler: EventHandler) -> WaitCanceler {
214 self.waiters.wait_async_fd_events(waiter, events, handler)
215 }
216
217 pub fn available(&mut self) -> Result<usize, Errno> {
218 if let Some(log) = &self.pending {
219 return Ok(log.len());
220 }
221 match self.try_next() {
222 Err(err) if err == EAGAIN => Ok(0),
223 Err(err) => Err(err),
224 Ok(Some(log)) => {
225 let size = log.len();
226 self.pending.replace(log);
227 return Ok(size);
228 }
229 Ok(None) => Ok(0),
230 }
231 }
232
233 fn snapshot(current_task: &CurrentTask) -> Result<LogIterator, Errno> {
234 LogIterator::new(¤t_task.kernel.syslog, fdiagnostics::StreamMode::Snapshot)
235 }
236
237 fn subscribe(current_task: &CurrentTask) -> Result<Self, Errno> {
238 Self::new_listening(current_task, fdiagnostics::StreamMode::Subscribe)
239 }
240
241 fn snapshot_then_subscribe(current_task: &CurrentTask) -> Result<Self, Errno> {
242 Self::new_listening(current_task, fdiagnostics::StreamMode::SnapshotThenSubscribe)
243 }
244
245 fn new_listening(
246 current_task: &CurrentTask,
247 mode: fdiagnostics::StreamMode,
248 ) -> Result<Self, Errno> {
249 let iterator = LogIterator::new(¤t_task.kernel.syslog, mode)?;
250 let (snd, receiver) = mpsc::sync_channel(1);
251 let waiters = Arc::new(WaitQueue::default());
252 let waiters_clone = waiters.clone();
253 let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {
254 scopeguard::defer! {
255 waiters_clone.notify_fd_events(FdEvents::POLLHUP);
256 };
257 for log in iterator {
258 if snd.send(log).is_err() {
259 break;
260 };
261 waiters_clone.notify_fd_events(FdEvents::POLLIN);
262 }
263 };
264 let req = SpawnRequestBuilder::new()
265 .with_debug_name("syslog-listener")
266 .with_sync_closure(closure)
267 .build();
268 current_task.kernel().kthreads.spawner().spawn_from_request(req);
269
270 Ok(Self { receiver, waiters, pending: Default::default() })
271 }
272
273 fn try_next(&mut self) -> Result<Option<Vec<u8>>, Errno> {
274 if let Some(value) = self.pending.take() {
275 return Ok(Some(value));
276 }
277 match self.receiver.try_recv() {
278 Ok(Ok(log)) => Ok(Some(log)),
280 Ok(Err(err)) => Err(err),
282 Err(mpsc::TryRecvError::Disconnected) => Ok(None),
284 Err(mpsc::TryRecvError::Empty) => error!(EAGAIN),
286 }
287 }
288}
289
290struct LogIterator {
291 iterator: fdiagnostics::BatchIteratorSynchronousProxy,
292 pending_formatted_contents: VecDeque<fdiagnostics::FormattedContent>,
293 pending_datas: VecDeque<Data<Logs>>,
294 state: Arc<Mutex<TimelineEstimator<DefaultFetcher>>>,
295}
296
297impl LogIterator {
298 fn new(syslog: &Syslog, mode: fdiagnostics::StreamMode) -> Result<Self, Errno> {
299 let accessor = connect_to_protocol_sync::<fdiagnostics::ArchiveAccessorMarker>()
300 .map_err(|_| errno!(ENOENT, format!("Failed to connecto to ArchiveAccessor")))?;
301 let is_subscribe = matches!(mode, fdiagnostics::StreamMode::Subscribe);
302 let stream_parameters = fdiagnostics::StreamParameters {
303 stream_mode: Some(mode),
304 data_type: Some(fdiagnostics::DataType::Logs),
305 format: Some(fdiagnostics::Format::Fxt),
306 client_selector_configuration: Some(
307 fdiagnostics::ClientSelectorConfiguration::SelectAll(true),
308 ),
309 ..fdiagnostics::StreamParameters::default()
310 };
311 let (client_end, server_end) =
312 fidl::endpoints::create_endpoints::<fdiagnostics::BatchIteratorMarker>();
313 accessor.stream_diagnostics(&stream_parameters, server_end).map_err(|err| {
314 errno!(EIO, format!("ArchiveAccessor/StreamDiagnostics failed: {err}"))
315 })?;
316 let iterator = fdiagnostics::BatchIteratorSynchronousProxy::new(client_end.into_channel());
317 if is_subscribe {
318 let () = iterator.wait_for_ready(zx::MonotonicInstant::INFINITE).map_err(|err| {
319 errno!(EIO, format!("Failed to wait for BatchIterator being ready: {err}"))
320 })?;
321 }
322 Ok(Self {
323 iterator,
324 pending_formatted_contents: VecDeque::new(),
325 pending_datas: VecDeque::new(),
326 state: syslog.state.clone(),
327 })
328 }
329
330 fn get_next(&mut self) -> Result<Option<Vec<u8>>, Errno> {
333 'main_loop: loop {
334 while let Some(data) = self.pending_datas.pop_front() {
335 if let Some(log) = format_log(data, &self.state).map_err(|_| errno!(EIO))? {
336 return Ok(Some(log));
337 }
338 }
339 while let Some(formatted_content) = self.pending_formatted_contents.pop_front() {
340 let output: OneOrMany<Data<Logs>> = match formatted_content {
341 fdiagnostics::FormattedContent::Fxt(data) => {
342 let buf = data
343 .read_to_vec(
344 0,
345 data.get_content_size().map_err(|a| {
346 errno!(EIO, format!("Error {a} getting VMO size"))
347 })?,
348 )
349 .map_err(|err| {
350 errno!(EIO, format!("failed to read logs vmo: {err}"))
351 })?;
352 let mut current_slice = buf.as_ref();
353 let mut ret: Option<OneOrMany<LogsData>> = None;
354 loop {
355 let (data, remaining) = from_extended_record(current_slice)
356 .map_err(|a| errno!(EIO, format!("Error {a} parsing FXT")))?;
357 ret = Some(match ret.take() {
358 Some(OneOrMany::One(one)) => OneOrMany::Many(vec![one, data]),
359 Some(OneOrMany::Many(mut many)) => {
360 many.push(data);
361 OneOrMany::Many(many)
362 }
363 None => OneOrMany::One(data),
364 });
365 if remaining.is_empty() {
366 break;
367 }
368 current_slice = remaining;
369 }
370 ret.ok_or_else(|| errno!(EIO, format!("archivist returned invalid data")))?
371 }
372 format => {
373 unreachable!("we only request and expect one format. Got: {format:?}")
374 }
375 };
376 match output {
377 OneOrMany::One(data) => {
378 if let Some(log) = format_log(data, &self.state).map_err(|_| errno!(EIO))? {
379 return Ok(Some(log));
380 }
381 }
382 OneOrMany::Many(datas) => {
383 if datas.len() > 0 {
384 self.pending_datas.extend(datas);
385 continue 'main_loop;
386 }
387 }
388 }
389 }
390 let next_batch = self
391 .iterator
392 .get_next(zx::MonotonicInstant::INFINITE)
393 .map_err(|_| errno!(ENOENT))?
394 .map_err(|_| errno!(ENOENT))?;
395 if next_batch.is_empty() {
396 return Ok(None);
397 }
398 self.pending_formatted_contents = VecDeque::from(next_batch);
399 }
400 }
401}
402
403impl Iterator for LogIterator {
404 type Item = Result<Vec<u8>, Errno>;
405
406 fn next(&mut self) -> Option<Result<Vec<u8>, Errno>> {
407 self.get_next().transpose()
408 }
409}
410
411impl Iterator for LogSubscription {
412 type Item = Result<Vec<u8>, Errno>;
413
414 fn next(&mut self) -> Option<Self::Item> {
415 self.try_next().transpose()
416 }
417}
418
419struct ResultBuffer {
420 max_size: usize,
421 buffer: VecDeque<Vec<u8>>,
422 current_size: usize,
423}
424
425impl ResultBuffer {
426 fn new(max_size: usize) -> Self {
427 Self { max_size, buffer: VecDeque::default(), current_size: 0 }
428 }
429
430 fn push(&mut self, data: Vec<u8>) {
431 while !self.buffer.is_empty() && self.current_size + data.len() > self.max_size {
432 let old = self.buffer.pop_front().unwrap();
433 self.current_size -= old.len();
434 }
435 self.current_size += data.len();
436 self.buffer.push_back(data);
437 }
438}
439
440impl Into<Vec<u8>> for ResultBuffer {
441 fn into(self) -> Vec<u8> {
442 let mut result = Vec::with_capacity(self.current_size);
443 for mut item in self.buffer {
444 result.append(&mut item);
445 }
446 let size = std::cmp::min(result.len(), std::cmp::min(self.max_size, self.current_size));
449 if result.len() != size {
450 result.resize(size, 0);
451 }
452 result
453 }
454}
455
456#[derive(Debug, Eq, PartialEq)]
457#[repr(u8)]
458pub enum KmsgLevel {
459 Emergency = 0,
460 Alert = 1,
461 Critical = 2,
462 Error = 3,
463 Warning = 4,
464 Notice = 5,
465 Info = 6,
466 Debug = 7,
467}
468
469impl KmsgLevel {
470 fn from_raw(value: u8) -> Option<KmsgLevel> {
471 if value < 8 {
472 Some(unsafe { std::mem::transmute(value) })
474 } else {
475 None
476 }
477 }
478}
479
480pub(crate) fn extract_level(msg: &[u8]) -> Option<(KmsgLevel, &[u8])> {
487 let mut bytes_iter = msg.iter();
488 let Some(c) = bytes_iter.next() else {
489 return None;
490 };
491 if *c != b'<' {
492 return None;
493 }
494 let Some(end) = bytes_iter.enumerate().find(|(_, c)| **c == b'>').map(|(i, _)| i + 1) else {
495 return None;
496 };
497 std::str::from_utf8(&msg[1..end])
498 .ok()
499 .and_then(|s| s.parse::<u64>().ok())
500 .map(|level| (level & 0x07) as u8)
501 .and_then(KmsgLevel::from_raw)
502 .map(|level| (level, &msg[end + 1..]))
503}
504
505fn format_log<T: TimeFetcher>(
506 data: Data<Logs>,
507 state: &Arc<Mutex<TimelineEstimator<T>>>,
508) -> Result<Option<Vec<u8>>, io::Error> {
509 let mut formatted_tags = match data.tags() {
510 None => vec![],
511 Some(tags) => {
512 let mut formatted = vec![];
513 for (i, tag) in tags.iter().enumerate() {
514 if tag.contains("fxlogcat") {
516 return Ok(None);
517 }
518 if i != 0 {
519 write!(&mut formatted, ",")?;
520 }
521 write!(&mut formatted, "{tag}")?;
522 }
523 write!(&mut formatted, ": ")?;
524 formatted
525 }
526 };
527
528 let mut result = Vec::<u8>::new();
529 let (level, msg_after_level) = match data.msg().and_then(|msg| extract_level(msg.as_bytes())) {
530 Some((level, remaining_msg)) => (level as u8, Some(remaining_msg)),
531 None => match data.severity() {
532 Severity::Trace | Severity::Debug => (KmsgLevel::Debug as u8, None),
533 Severity::Info => (KmsgLevel::Info as u8, None),
534 Severity::Warn => (KmsgLevel::Warning as u8, None),
535 Severity::Error => (KmsgLevel::Error as u8, None),
536 Severity::Fatal => (KmsgLevel::Critical as u8, None),
537 },
538 };
539
540 let time = state.lock().boot_time_to_monotonic_time(data.metadata.timestamp);
545 let time_nanos = time.into_nanos();
546 let time_secs = time_nanos / NANOS_PER_SECOND;
547 let time_fract = (time_nanos % NANOS_PER_SECOND) / MICROS_PER_NANOSECOND;
549 let component_name = data.component_name();
550 write!(&mut result, "<{level}>[{time_secs:05}.{time_fract:06}] {component_name}",)?;
551
552 match data.metadata.pid {
553 Some(pid) => write!(&mut result, "[{pid}]: ")?,
554 None => write!(&mut result, ": ")?,
555 }
556
557 result.append(&mut formatted_tags);
558
559 if let Some(msg) = msg_after_level {
560 write!(&mut result, "{}", String::from_utf8_lossy(msg))?;
561 } else if let Some(msg) = data.msg() {
562 write!(&mut result, "{msg}")?;
563 }
564
565 for kvp in data.payload_keys_strings() {
566 write!(&mut result, " {kvp}")?;
567 }
568 write!(&mut result, "\n")?;
569 Ok(Some(result))
570}
571
572#[cfg(test)]
573mod tests {
574 use super::*;
575
576 #[test]
577 fn test_result_buffer() {
578 let mut buffer = ResultBuffer::new(100);
579 buffer.push(vec![0; 200]);
580 let result: Vec<u8> = buffer.into();
581 assert_eq!(result.len(), 100);
582
583 let mut buffer = ResultBuffer::new(100);
584 buffer.push(Vec::from_iter(0..20));
585 buffer.push(Vec::from_iter(20..50));
586 let result: Vec<u8> = buffer.into();
587 assert_eq!(result.len(), 50);
588 for i in 0..50u8 {
589 assert_eq!(result[i as usize], i);
590 }
591 }
592
593 #[test]
594 fn test_extract_level() {
595 for level in 0..8 {
596 let msg = format!("<{level}> some message");
597 let result = extract_level(msg.as_bytes()).map(|(x, i)| (x as u8, i));
598 assert_eq!(Some((level, " some message".as_bytes())), result);
599 }
600 }
601
602 #[test]
603 fn parse_message_accepts_levels_greater_than_7() {
604 assert_eq!(
605 Some((KmsgLevel::Warning, " message".as_bytes())),
606 extract_level("<100> message".as_bytes())
607 );
608 }
609
610 #[test]
611 fn parse_message_defaults_when_non_numbers() {
612 assert_eq!(None, extract_level("<a> some message".as_bytes()));
613 }
614
615 #[test]
616 fn parse_message_defaults_when_invalid_level_syntax() {
617 assert_eq!(None, extract_level("<1 some message".as_bytes()));
618 }
619
620 #[test]
621 fn parse_message_defaults_when_no_level() {
622 assert_eq!(None, extract_level("some message".as_bytes()));
623 }
624}