1use crate::security;
6use crate::task::dynamic_thread_spawner::SpawnRequestBuilder;
7use crate::task::{CurrentTask, EventHandler, WaitCallback, WaitCanceler, WaitQueue, Waiter};
8use crate::vfs::OutputBuffer;
9use diagnostics_data::{Data, Logs, LogsData, Severity};
10use diagnostics_message::from_extended_record;
11use estimate_timeline::{DefaultFetcher, TimeFetcher, TimelineEstimator};
12use fidl_fuchsia_diagnostics as fdiagnostics;
13use fuchsia_component::client::connect_to_protocol_sync;
14use fuchsia_inspect::Inspector;
15use futures::FutureExt;
16use serde::Deserialize;
17use starnix_sync::{Locked, Mutex, Unlocked};
18use starnix_uapi::auth::CAP_SYSLOG;
19use starnix_uapi::errors::{EAGAIN, Errno, errno, error};
20use starnix_uapi::syslog::SyslogAction;
21use starnix_uapi::vfs::FdEvents;
22use std::cmp;
23use std::collections::VecDeque;
24use std::io::{self, Write};
25use std::sync::atomic::Ordering;
26use std::sync::{Arc, OnceLock, mpsc};
27use zerocopy::{Immutable, IntoBytes, KnownLayout, TryFromBytes};
28
29const BUFFER_SIZE: i32 = 1_049_000;
30
31const NANOS_PER_SECOND: i64 = 1_000_000_000;
32const MICROS_PER_NANOSECOND: i64 = 1_000;
33
34#[derive(Default)]
35pub struct Syslog {
36 syscall_subscription: OnceLock<Mutex<LogSubscription>>,
37 state: Arc<Mutex<TimelineEstimator<DefaultFetcher>>>,
38}
39
40#[derive(Debug)]
41pub enum SyslogAccess {
42 DevKmsgRead,
43 ProcKmsg(SyslogAction),
44 Syscall(SyslogAction),
45}
46
47impl Syslog {
48 pub fn init(&self, system_task: &CurrentTask) -> Result<(), anyhow::Error> {
49 let state = self.state.clone();
50 system_task.kernel.inspect_node.record_lazy_child("syslog", move || {
51 let state = state.clone();
52 async move {
53 let inspector = Inspector::default();
54 let state_guard = state.lock();
55 inspector.root().record_uint("max_timeline_size", state_guard.max_timeline_size());
56 inspector
57 .root()
58 .record_uint("timeline_overflows", state_guard.timeline_overflows());
59 Ok(inspector)
60 }
61 .boxed()
62 });
63
64 let subscription = LogSubscription::snapshot_then_subscribe(system_task)?;
65 self.syscall_subscription.set(Mutex::new(subscription)).expect("syslog inititialized once");
66 Ok(())
67 }
68
69 pub fn access(
70 &self,
71 current_task: &CurrentTask,
72 access: SyslogAccess,
73 ) -> Result<GrantedSyslog<'_>, Errno> {
74 Self::validate_access(current_task, access)?;
75 let syscall_subscription = self.subscription()?;
76 Ok(GrantedSyslog { syscall_subscription })
77 }
78
79 pub fn validate_access(current_task: &CurrentTask, access: SyslogAccess) -> Result<(), Errno> {
82 let (action, check_capabilities) = match access {
83 SyslogAccess::ProcKmsg(SyslogAction::Open) => (SyslogAction::Open, true),
84 SyslogAccess::DevKmsgRead => (SyslogAction::ReadAll, true),
85 SyslogAccess::Syscall(a) => (a, true),
86 SyslogAccess::ProcKmsg(a) => (a, false),
88 };
89
90 let action_is_privileged = !matches!(
93 access,
94 SyslogAccess::Syscall(SyslogAction::ReadAll | SyslogAction::SizeBuffer)
95 | SyslogAccess::DevKmsgRead,
96 );
97 let restrict_dmesg = current_task.kernel().restrict_dmesg.load(Ordering::Relaxed);
98 if check_capabilities && (action_is_privileged || restrict_dmesg) {
99 security::check_task_capable(current_task, CAP_SYSLOG)?;
100 }
101
102 security::check_syslog_access(current_task, action)?;
103 Ok(())
104 }
105
106 pub fn snapshot_then_subscribe(current_task: &CurrentTask) -> Result<LogSubscription, Errno> {
107 LogSubscription::snapshot_then_subscribe(current_task)
108 }
109
110 pub fn subscribe(current_task: &CurrentTask) -> Result<LogSubscription, Errno> {
111 LogSubscription::subscribe(current_task)
112 }
113
114 fn subscription(&self) -> Result<&Mutex<LogSubscription>, Errno> {
115 self.syscall_subscription.get().ok_or_else(|| errno!(ENOENT))
116 }
117}
118
119pub struct GrantedSyslog<'a> {
120 syscall_subscription: &'a Mutex<LogSubscription>,
121}
122
123impl GrantedSyslog<'_> {
124 pub fn read(&self, out: &mut dyn OutputBuffer) -> Result<i32, Errno> {
125 let mut subscription = self.syscall_subscription.lock();
126 if let Some(log) = subscription.try_next()? {
127 let size_to_write = cmp::min(log.len(), out.available() as usize);
128 out.write(&log[..size_to_write])?;
129 return Ok(size_to_write as i32);
130 }
131 Ok(0)
132 }
133
134 pub fn wait(&self, waiter: &Waiter, events: FdEvents, handler: EventHandler) -> WaitCanceler {
135 self.syscall_subscription.lock().wait(waiter, events, handler)
136 }
137
138 pub fn blocking_read(
139 &self,
140 locked: &mut Locked<Unlocked>,
141 current_task: &CurrentTask,
142 out: &mut dyn OutputBuffer,
143 ) -> Result<i32, Errno> {
144 let mut subscription = self.syscall_subscription.lock();
145 let mut write_log = |log: Vec<u8>| {
146 let size_to_write = cmp::min(log.len(), out.available() as usize);
147 out.write(&log[..size_to_write])?;
148 Ok(size_to_write as i32)
149 };
150 match subscription.try_next() {
151 Err(errno) if errno == EAGAIN => {}
152 Err(errno) => return Err(errno),
153 Ok(Some(log)) => return write_log(log),
154 Ok(None) => return Ok(0),
155 }
156 let waiter = Waiter::new();
157 loop {
158 let _w = subscription.wait(
159 &waiter,
160 FdEvents::POLLIN | FdEvents::POLLHUP,
161 WaitCallback::none(),
162 );
163 match subscription.try_next() {
164 Err(errno) if errno == EAGAIN => {}
165 Err(errno) => return Err(errno),
166 Ok(Some(log)) => return write_log(log),
167 Ok(None) => return Ok(0),
168 }
169 waiter.wait(locked, current_task)?;
170 }
171 }
172
173 pub fn read_all(
174 &self,
175 current_task: &CurrentTask,
176 out: &mut dyn OutputBuffer,
177 ) -> Result<i32, Errno> {
178 let mut subscription = LogSubscription::snapshot(current_task)?;
179 let mut buffer = ResultBuffer::new(out.available());
180 while let Some(log_result) = subscription.next() {
181 buffer.push(log_result?);
182 }
183 let result: Vec<u8> = buffer.into();
184 out.write(result.as_slice())?;
185 Ok(result.len() as i32)
186 }
187
188 pub fn size_unread(&self) -> Result<i32, Errno> {
189 let mut subscription = self.syscall_subscription.lock();
190 Ok(subscription.available()?.try_into().unwrap_or(std::i32::MAX))
191 }
192
193 pub fn size_buffer(&self) -> Result<i32, Errno> {
194 Ok(BUFFER_SIZE)
196 }
197}
198
199#[derive(Debug)]
200pub struct LogSubscription {
201 pending: Option<Vec<u8>>,
202 receiver: mpsc::Receiver<Result<Vec<u8>, Errno>>,
203 waiters: Arc<WaitQueue>,
204}
205
206#[derive(Debug, Deserialize)]
207#[serde(untagged)]
208enum OneOrMany<T> {
209 Many(Vec<T>),
210 One(T),
211}
212
213impl LogSubscription {
214 pub fn wait(&self, waiter: &Waiter, events: FdEvents, handler: EventHandler) -> WaitCanceler {
215 self.waiters.wait_async_fd_events(waiter, events, handler)
216 }
217
218 pub fn available(&mut self) -> Result<usize, Errno> {
219 if let Some(log) = &self.pending {
220 return Ok(log.len());
221 }
222 match self.try_next() {
223 Err(err) if err == EAGAIN => Ok(0),
224 Err(err) => Err(err),
225 Ok(Some(log)) => {
226 let size = log.len();
227 self.pending.replace(log);
228 return Ok(size);
229 }
230 Ok(None) => Ok(0),
231 }
232 }
233
234 fn snapshot(current_task: &CurrentTask) -> Result<LogIterator, Errno> {
235 LogIterator::new(¤t_task.kernel.syslog, fdiagnostics::StreamMode::Snapshot)
236 }
237
238 fn subscribe(current_task: &CurrentTask) -> Result<Self, Errno> {
239 Self::new_listening(current_task, fdiagnostics::StreamMode::Subscribe)
240 }
241
242 fn snapshot_then_subscribe(current_task: &CurrentTask) -> Result<Self, Errno> {
243 Self::new_listening(current_task, fdiagnostics::StreamMode::SnapshotThenSubscribe)
244 }
245
246 fn new_listening(
247 current_task: &CurrentTask,
248 mode: fdiagnostics::StreamMode,
249 ) -> Result<Self, Errno> {
250 let iterator = LogIterator::new(¤t_task.kernel.syslog, mode)?;
251 let (snd, receiver) = mpsc::sync_channel(1);
252 let waiters = Arc::new(WaitQueue::default());
253 let waiters_clone = waiters.clone();
254 let closure = move |_: &mut Locked<Unlocked>, _: &CurrentTask| {
255 scopeguard::defer! {
256 waiters_clone.notify_fd_events(FdEvents::POLLHUP);
257 };
258 for log in iterator {
259 if snd.send(log).is_err() {
260 break;
261 };
262 waiters_clone.notify_fd_events(FdEvents::POLLIN);
263 }
264 };
265 let req = SpawnRequestBuilder::new()
266 .with_debug_name("syslog-listener")
267 .with_sync_closure(closure)
268 .build();
269 current_task.kernel().kthreads.spawner().spawn_from_request(req);
270
271 Ok(Self { receiver, waiters, pending: Default::default() })
272 }
273
274 fn try_next(&mut self) -> Result<Option<Vec<u8>>, Errno> {
275 if let Some(value) = self.pending.take() {
276 return Ok(Some(value));
277 }
278 match self.receiver.try_recv() {
279 Ok(Ok(log)) => Ok(Some(log)),
281 Ok(Err(err)) => Err(err),
283 Err(mpsc::TryRecvError::Disconnected) => Ok(None),
285 Err(mpsc::TryRecvError::Empty) => error!(EAGAIN),
287 }
288 }
289}
290
291struct LogIterator {
292 iterator: fdiagnostics::BatchIteratorSynchronousProxy,
293 pending_formatted_contents: VecDeque<fdiagnostics::FormattedContent>,
294 pending_datas: VecDeque<Data<Logs>>,
295 state: Arc<Mutex<TimelineEstimator<DefaultFetcher>>>,
296}
297
298impl LogIterator {
299 fn new(syslog: &Syslog, mode: fdiagnostics::StreamMode) -> Result<Self, Errno> {
300 let accessor = connect_to_protocol_sync::<fdiagnostics::ArchiveAccessorMarker>()
301 .map_err(|_| errno!(ENOENT, format!("Failed to connecto to ArchiveAccessor")))?;
302 let is_subscribe = matches!(mode, fdiagnostics::StreamMode::Subscribe);
303 let stream_parameters = fdiagnostics::StreamParameters {
304 stream_mode: Some(mode),
305 data_type: Some(fdiagnostics::DataType::Logs),
306 format: Some(fdiagnostics::Format::Fxt),
307 client_selector_configuration: Some(
308 fdiagnostics::ClientSelectorConfiguration::SelectAll(true),
309 ),
310 ..fdiagnostics::StreamParameters::default()
311 };
312 let (client_end, server_end) =
313 fidl::endpoints::create_endpoints::<fdiagnostics::BatchIteratorMarker>();
314 accessor.stream_diagnostics(&stream_parameters, server_end).map_err(|err| {
315 errno!(EIO, format!("ArchiveAccessor/StreamDiagnostics failed: {err}"))
316 })?;
317 let iterator = fdiagnostics::BatchIteratorSynchronousProxy::new(client_end.into_channel());
318 if is_subscribe {
319 let () = iterator.wait_for_ready(zx::MonotonicInstant::INFINITE).map_err(|err| {
320 errno!(EIO, format!("Failed to wait for BatchIterator being ready: {err}"))
321 })?;
322 }
323 Ok(Self {
324 iterator,
325 pending_formatted_contents: VecDeque::new(),
326 pending_datas: VecDeque::new(),
327 state: syslog.state.clone(),
328 })
329 }
330
331 fn get_next(&mut self) -> Result<Option<Vec<u8>>, Errno> {
334 'main_loop: loop {
335 while let Some(data) = self.pending_datas.pop_front() {
336 if let Some(log) = format_log(data, &self.state).map_err(|_| errno!(EIO))? {
337 return Ok(Some(log));
338 }
339 }
340 while let Some(formatted_content) = self.pending_formatted_contents.pop_front() {
341 let output: OneOrMany<Data<Logs>> = match formatted_content {
342 fdiagnostics::FormattedContent::Fxt(data) => {
343 let buf = data
344 .read_to_vec(
345 0,
346 data.get_content_size().map_err(|a| {
347 errno!(EIO, format!("Error {a} getting VMO size"))
348 })?,
349 )
350 .map_err(|err| {
351 errno!(EIO, format!("failed to read logs vmo: {err}"))
352 })?;
353 let mut current_slice = buf.as_ref();
354 let mut ret: Option<OneOrMany<LogsData>> = None;
355 loop {
356 let (data, remaining) = from_extended_record(current_slice)
357 .map_err(|a| errno!(EIO, format!("Error {a} parsing FXT")))?;
358 ret = Some(match ret.take() {
359 Some(OneOrMany::One(one)) => OneOrMany::Many(vec![one, data]),
360 Some(OneOrMany::Many(mut many)) => {
361 many.push(data);
362 OneOrMany::Many(many)
363 }
364 None => OneOrMany::One(data),
365 });
366 if remaining.is_empty() {
367 break;
368 }
369 current_slice = remaining;
370 }
371 ret.ok_or_else(|| errno!(EIO, format!("archivist returned invalid data")))?
372 }
373 format => {
374 unreachable!("we only request and expect one format. Got: {format:?}")
375 }
376 };
377 match output {
378 OneOrMany::One(data) => {
379 if let Some(log) = format_log(data, &self.state).map_err(|_| errno!(EIO))? {
380 return Ok(Some(log));
381 }
382 }
383 OneOrMany::Many(datas) => {
384 if datas.len() > 0 {
385 self.pending_datas.extend(datas);
386 continue 'main_loop;
387 }
388 }
389 }
390 }
391 let next_batch = self
392 .iterator
393 .get_next(zx::MonotonicInstant::INFINITE)
394 .map_err(|_| errno!(ENOENT))?
395 .map_err(|_| errno!(ENOENT))?;
396 if next_batch.is_empty() {
397 return Ok(None);
398 }
399 self.pending_formatted_contents = VecDeque::from(next_batch);
400 }
401 }
402}
403
404impl Iterator for LogIterator {
405 type Item = Result<Vec<u8>, Errno>;
406
407 fn next(&mut self) -> Option<Result<Vec<u8>, Errno>> {
408 self.get_next().transpose()
409 }
410}
411
412impl Iterator for LogSubscription {
413 type Item = Result<Vec<u8>, Errno>;
414
415 fn next(&mut self) -> Option<Self::Item> {
416 self.try_next().transpose()
417 }
418}
419
420struct ResultBuffer {
421 max_size: usize,
422 buffer: VecDeque<Vec<u8>>,
423 current_size: usize,
424}
425
426impl ResultBuffer {
427 fn new(max_size: usize) -> Self {
428 Self { max_size, buffer: VecDeque::default(), current_size: 0 }
429 }
430
431 fn push(&mut self, data: Vec<u8>) {
432 while !self.buffer.is_empty() && self.current_size + data.len() > self.max_size {
433 let old = self.buffer.pop_front().unwrap();
434 self.current_size -= old.len();
435 }
436 self.current_size += data.len();
437 self.buffer.push_back(data);
438 }
439}
440
441impl Into<Vec<u8>> for ResultBuffer {
442 fn into(self) -> Vec<u8> {
443 let mut result = Vec::with_capacity(self.current_size);
444 for mut item in self.buffer {
445 result.append(&mut item);
446 }
447 let size = std::cmp::min(result.len(), std::cmp::min(self.max_size, self.current_size));
450 if result.len() != size {
451 result.resize(size, 0);
452 }
453 result
454 }
455}
456
457#[derive(Debug, Eq, PartialEq, Copy, Clone, KnownLayout, TryFromBytes, Immutable, IntoBytes)]
458#[repr(u8)]
459pub enum KmsgLevel {
460 Emergency = 0,
461 Alert = 1,
462 Critical = 2,
463 Error = 3,
464 Warning = 4,
465 Notice = 5,
466 Info = 6,
467 Debug = 7,
468}
469
470impl KmsgLevel {
471 fn from_raw(value: u8) -> Option<KmsgLevel> {
472 zerocopy::try_transmute!(value).ok()
473 }
474}
475
476pub(crate) fn extract_level(msg: &[u8]) -> Option<(KmsgLevel, &[u8])> {
483 let mut bytes_iter = msg.iter();
484 let Some(c) = bytes_iter.next() else {
485 return None;
486 };
487 if *c != b'<' {
488 return None;
489 }
490 let Some(end) = bytes_iter.enumerate().find(|(_, c)| **c == b'>').map(|(i, _)| i + 1) else {
491 return None;
492 };
493 std::str::from_utf8(&msg[1..end])
494 .ok()
495 .and_then(|s| s.parse::<u64>().ok())
496 .map(|level| (level & 0x07) as u8)
497 .and_then(KmsgLevel::from_raw)
498 .map(|level| (level, &msg[end + 1..]))
499}
500
501fn format_log<T: TimeFetcher>(
502 data: Data<Logs>,
503 state: &Arc<Mutex<TimelineEstimator<T>>>,
504) -> Result<Option<Vec<u8>>, io::Error> {
505 let mut formatted_tags = match data.tags() {
506 None => vec![],
507 Some(tags) => {
508 let mut formatted = vec![];
509 for (i, tag) in tags.iter().enumerate() {
510 if tag.contains("fxlogcat") {
512 return Ok(None);
513 }
514 if i != 0 {
515 write!(&mut formatted, ",")?;
516 }
517 write!(&mut formatted, "{tag}")?;
518 }
519 write!(&mut formatted, ": ")?;
520 formatted
521 }
522 };
523
524 let mut result = Vec::<u8>::new();
525 let (level, msg_after_level) = match data.msg().and_then(|msg| extract_level(msg.as_bytes())) {
526 Some((level, remaining_msg)) => (level as u8, Some(remaining_msg)),
527 None => match data.severity() {
528 Severity::Trace | Severity::Debug => (KmsgLevel::Debug as u8, None),
529 Severity::Info => (KmsgLevel::Info as u8, None),
530 Severity::Warn => (KmsgLevel::Warning as u8, None),
531 Severity::Error => (KmsgLevel::Error as u8, None),
532 Severity::Fatal => (KmsgLevel::Critical as u8, None),
533 },
534 };
535
536 let time = state.lock().boot_time_to_monotonic_time(data.metadata.timestamp);
541 let time_nanos = time.into_nanos();
542 let time_secs = time_nanos / NANOS_PER_SECOND;
543 let time_fract = (time_nanos % NANOS_PER_SECOND) / MICROS_PER_NANOSECOND;
545 let component_name = data.component_name();
546 write!(&mut result, "<{level}>[{time_secs:05}.{time_fract:06}] {component_name}",)?;
547
548 match data.metadata.pid {
549 Some(pid) => write!(&mut result, "[{pid}]: ")?,
550 None => write!(&mut result, ": ")?,
551 }
552
553 result.append(&mut formatted_tags);
554
555 if let Some(msg) = msg_after_level {
556 write!(&mut result, "{}", String::from_utf8_lossy(msg))?;
557 } else if let Some(msg) = data.msg() {
558 write!(&mut result, "{msg}")?;
559 }
560
561 for kvp in data.payload_keys_strings() {
562 write!(&mut result, " {kvp}")?;
563 }
564 write!(&mut result, "\n")?;
565 Ok(Some(result))
566}
567
568#[cfg(test)]
569mod tests {
570 use super::*;
571
572 #[test]
573 fn test_result_buffer() {
574 let mut buffer = ResultBuffer::new(100);
575 buffer.push(vec![0; 200]);
576 let result: Vec<u8> = buffer.into();
577 assert_eq!(result.len(), 100);
578
579 let mut buffer = ResultBuffer::new(100);
580 buffer.push(Vec::from_iter(0..20));
581 buffer.push(Vec::from_iter(20..50));
582 let result: Vec<u8> = buffer.into();
583 assert_eq!(result.len(), 50);
584 for i in 0..50u8 {
585 assert_eq!(result[i as usize], i);
586 }
587 }
588
589 #[test]
590 fn test_extract_level() {
591 for level in 0..8 {
592 let msg = format!("<{level}> some message");
593 let result = extract_level(msg.as_bytes()).map(|(x, i)| (x as u8, i));
594 assert_eq!(Some((level, " some message".as_bytes())), result);
595 }
596 }
597
598 #[test]
599 fn parse_message_accepts_levels_greater_than_7() {
600 assert_eq!(
601 Some((KmsgLevel::Warning, " message".as_bytes())),
602 extract_level("<100> message".as_bytes())
603 );
604 }
605
606 #[test]
607 fn parse_message_defaults_when_non_numbers() {
608 assert_eq!(None, extract_level("<a> some message".as_bytes()));
609 }
610
611 #[test]
612 fn parse_message_defaults_when_invalid_level_syntax() {
613 assert_eq!(None, extract_level("<1 some message".as_bytes()));
614 }
615
616 #[test]
617 fn parse_message_defaults_when_no_level() {
618 assert_eq!(None, extract_level("some message".as_bytes()));
619 }
620}