1use crate::identity::ComponentIdentity;
5use crate::logs::error::LogsError;
6use crate::logs::repository::LogsRepository;
7use crate::logs::shared_buffer::FilterCursor;
8use diagnostics_log_encoding::encode::{Encoder, EncoderOpts, ResizableBuffer};
9use diagnostics_log_encoding::{Argument, Header, LOG_CONTROL_BIT, MONIKER, Record, URL};
10use fidl::endpoints::{ControlHandle, DiscoverableProtocolMarker, RequestStream};
11use fidl_fuchsia_diagnostics as fdiagnostics;
12use fidl_fuchsia_diagnostics::StreamMode;
13use fidl_fuchsia_diagnostics_types::Severity;
14use fuchsia_async as fasync;
15use futures::{AsyncWriteExt, StreamExt};
16use log::warn;
17use std::collections::HashMap;
18use std::future::poll_fn;
19use std::io::Cursor;
20use std::pin::pin;
21use std::sync::Arc;
22use std::task::{Poll, ready};
23use zerocopy::{FromBytes, IntoBytes};
24
25#[derive(thiserror::Error, Debug)]
26enum StreamError {
27 #[error(transparent)]
28 Io(#[from] std::io::Error),
29}
30
31pub struct LogStreamServer {
32 logs_repo: Arc<LogsRepository>,
34
35 scope: fasync::Scope,
37}
38
39impl LogStreamServer {
40 pub fn new(logs_repo: Arc<LogsRepository>, scope: fasync::Scope) -> Self {
41 Self { logs_repo, scope }
42 }
43
44 pub fn spawn(&self, stream: fdiagnostics::LogStreamRequestStream) {
46 let logs_repo = Arc::clone(&self.logs_repo);
47 let scope = self.scope.to_handle();
48 self.scope.spawn(async move {
49 if let Err(e) = Self::handle_requests(logs_repo, stream, scope).await {
50 warn!("error handling Log requests: {}", e);
51 }
52 });
53 }
54
55 async fn handle_requests(
58 logs_repo: Arc<LogsRepository>,
59 mut stream: fdiagnostics::LogStreamRequestStream,
60 scope: fasync::ScopeHandle,
61 ) -> Result<(), LogsError> {
62 while let Some(request) = stream.next().await {
63 let request = request.map_err(|source| LogsError::HandlingRequests {
64 protocol: fdiagnostics::LogStreamMarker::PROTOCOL_NAME,
65 source,
66 })?;
67
68 match request {
69 fdiagnostics::LogStreamRequest::Connect { socket, opts, .. } => {
70 let logs = logs_repo.logs_cursor_raw(
71 opts.mode.unwrap_or(StreamMode::SnapshotThenSubscribe),
72 Vec::new(),
73 );
74 let opts = ExtendRecordOpts::from(opts);
75 if opts.subscribe_to_manifest {
76 if opts.moniker || opts.component_url || opts.rolled_out {
77 stream.control_handle().shutdown_with_epitaph(zx::Status::INVALID_ARGS);
78 return Ok(());
79 }
80
81 scope.spawn(async move {
82 let _ = Self::stream_logs_with_manifest(
83 fasync::Socket::from_socket(socket),
84 logs,
85 )
86 .await;
87 });
88 } else {
89 scope.spawn(Self::stream_logs(
90 fasync::Socket::from_socket(socket),
91 logs,
92 opts,
93 ));
94 }
95 }
96 fdiagnostics::LogStreamRequest::_UnknownMethod {
97 ordinal,
98 method_type,
99 control_handle,
100 ..
101 } => {
102 warn!(ordinal, method_type:?; "Unknown request. Closing connection");
103 control_handle.shutdown_with_epitaph(zx::Status::UNAVAILABLE);
104 }
105 }
106 }
107 Ok(())
108 }
109
110 async fn stream_logs(mut socket: fasync::Socket, logs: FilterCursor, opts: ExtendRecordOpts) {
111 let mut logs = pin!(logs);
112 let mut buffer = Vec::new();
113 while let Some((identity, dropped)) = poll_fn(|cx| {
114 loop {
115 match ready!(logs.as_mut().poll_next(cx)) {
116 Some(message) => {
117 let (identity_ref, header, data) = match message.parse() {
118 Ok(res) => res,
119 Err(_) => {
120 continue;
123 }
124 };
125 let identity = Arc::clone(identity_ref);
126 buffer.clear();
127 buffer.reserve(data.len());
128 buffer.extend_from_slice(header.as_bytes());
129 buffer.extend_from_slice(&data[8..]);
130 return Poll::Ready(Some((identity, message.dropped)));
131 }
132 None => return Poll::Ready(None),
133 }
134 }
135 })
136 .await
137 {
138 extend_fxt_record(&identity, dropped, &opts, &mut buffer);
139 if socket.write_all(&buffer).await.is_err() {
140 break;
142 }
143 }
144 }
145
146 async fn stream_logs_with_manifest(
147 mut socket: fasync::Socket,
148 logs: FilterCursor,
149 ) -> Result<(), StreamError> {
150 let mut logs = pin!(logs);
151 let mut sent_tags = HashMap::new();
152 let mut buffer = Vec::new();
153 while let Some((identity, tag)) = poll_fn(|cx| {
154 loop {
155 match ready!(logs.as_mut().poll_next(cx)) {
156 Some(message) => {
157 let (identity_ref, header, data) = match message.parse() {
158 Ok(res) => res,
159 Err(_) => continue,
160 };
161 let identity = Arc::clone(identity_ref);
162 buffer.clear();
163 buffer.reserve(data.len());
164 buffer.extend_from_slice(header.as_bytes());
165 buffer.extend_from_slice(&data[8..]);
166 return Poll::Ready(Some((identity, header.tag() as u64)));
167 }
168 None => return Poll::Ready(None),
169 }
170 }
171 })
172 .await
173 {
174 let send = match sent_tags.entry(tag) {
175 std::collections::hash_map::Entry::Vacant(e) => {
176 e.insert(Arc::clone(&identity));
177 true
178 }
179 std::collections::hash_map::Entry::Occupied(mut e) => {
180 if !Arc::ptr_eq(e.get(), &identity) && **e.get() != *identity {
181 e.insert(Arc::clone(&identity));
182 true
183 } else {
184 false
185 }
186 }
187 };
188
189 if send {
190 Self::send_component_change(&mut socket, tag, &identity).await?;
191 }
192 socket.write_all(&buffer).await?;
193 }
194 Ok(())
195 }
196
197 async fn send_component_change(
198 socket: &mut fasync::Socket,
199 id: u64,
200 identity: &ComponentIdentity,
201 ) -> Result<(), std::io::Error> {
202 let mut encoder =
203 Encoder::new(Cursor::new(ResizableBuffer::from(Vec::new())), EncoderOpts::default());
204 let record = Record {
205 timestamp: zx::BootInstant::from_nanos(0),
206 severity: Severity::Info.into_primitive(),
207 arguments: vec![
208 Argument::other(MONIKER, identity.moniker.to_string()),
209 Argument::other(URL, identity.url.as_str()),
210 ],
211 };
212 encoder.write_record(record).map_err(std::io::Error::other)?;
213
214 let mut buffer = encoder.take().into_inner().into_inner();
215 if buffer.len() >= 8 {
216 let mut header = Header::read_from_bytes(&buffer[0..8]).unwrap();
217 header.set_tag((id as u32) | LOG_CONTROL_BIT);
218 buffer[0..8].copy_from_slice(header.as_bytes());
219 }
220 socket.write_all(&buffer).await?;
221 Ok(())
222 }
223}
224
225#[derive(Default)]
226pub struct ExtendRecordOpts {
227 pub moniker: bool,
228 pub component_url: bool,
229 pub rolled_out: bool,
230 pub subscribe_to_manifest: bool,
231}
232
233impl ExtendRecordOpts {
234 fn should_extend(&self) -> bool {
235 let Self { moniker, component_url, rolled_out, subscribe_to_manifest: _ } = self;
236 *moniker || *component_url || *rolled_out
237 }
238}
239
240impl From<fdiagnostics::LogStreamOptions> for ExtendRecordOpts {
241 fn from(opts: fdiagnostics::LogStreamOptions) -> Self {
242 let fdiagnostics::LogStreamOptions {
243 include_moniker,
244 include_component_url,
245 include_rolled_out,
246 mode: _,
247 __source_breaking: _,
248 subscribe_to_manifest,
249 } = opts;
250 Self {
251 moniker: include_moniker.unwrap_or(false),
252 component_url: include_component_url.unwrap_or(false),
253 rolled_out: include_rolled_out.unwrap_or(false),
254 subscribe_to_manifest: subscribe_to_manifest.unwrap_or(false),
255 }
256 }
257}
258
259fn padding(len: usize) -> &'static [u8] {
261 &[0; 8][(len + 7) % 8 + 1..]
262}
263
264pub fn extend_fxt_record(
265 identity: &ComponentIdentity,
266 rolled_out: u64,
267 opts: &ExtendRecordOpts,
268 buffer: &mut Vec<u8>,
269) {
270 if !opts.should_extend() {
271 return;
272 }
273
274 let moniker = if opts.moniker { identity.moniker.as_ref() } else { "" };
275 let component_url = if opts.component_url { identity.url.as_ref() } else { "" };
276 let rolled_out_value = if opts.rolled_out { rolled_out } else { 0 };
277
278 let moniker_len = moniker.len() as u32;
279 let component_url_len = component_url.len() as u32;
280
281 buffer.extend_from_slice(&moniker_len.to_le_bytes());
282 buffer.extend_from_slice(&component_url_len.to_le_bytes());
283 buffer.extend_from_slice(&rolled_out_value.to_le_bytes());
284
285 buffer.extend_from_slice(moniker.as_bytes());
286 buffer.extend_from_slice(padding(moniker.len()));
287
288 buffer.extend_from_slice(component_url.as_bytes());
289 buffer.extend_from_slice(padding(component_url.len()));
290}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295 use crate::logs::testing::make_message;
296 use diagnostics_log_encoding::Value;
297 use diagnostics_log_encoding::parse::parse_record;
298 use futures::AsyncReadExt;
299 use moniker::ExtendedMoniker;
300 use test_case::test_case;
301 use zx;
302
303 #[fuchsia::test]
304 async fn log_stream_with_manifest() {
305 let repo = LogsRepository::for_test(fasync::Scope::new());
306 let identity = Arc::new(ComponentIdentity::new(
307 ExtendedMoniker::parse_str("./foo").unwrap(),
308 "fuchsia-pkg://foo",
309 ));
310 let container = repo.get_log_container(Arc::clone(&identity));
311 let container_tag = container.buffer().iob_tag() as u32;
312
313 let scope = fasync::Scope::new();
314 let server = Arc::new(LogStreamServer::new(Arc::clone(&repo), scope));
315 let (proxy, stream) =
316 fidl::endpoints::create_proxy_and_stream::<fdiagnostics::LogStreamMarker>();
317 server.spawn(stream);
318
319 let (client_socket, server_socket) = zx::Socket::create_stream();
320 let mut client_socket = fasync::Socket::from_socket(client_socket);
321
322 let opts = fdiagnostics::LogStreamOptions {
323 subscribe_to_manifest: Some(true),
324 mode: Some(StreamMode::SnapshotThenSubscribe),
325 ..Default::default()
326 };
327 proxy.connect(server_socket, &opts).expect("connect");
328
329 container.ingest_message(make_message("a", None, zx::BootInstant::from_nanos(1)));
332
333 let mut buf = vec![0u8; 4096];
334 let mut offset = 0;
335
336 let (manifest_record, manifest_len) = loop {
338 if offset > 0
339 && let Ok((record, rest)) = parse_record(&buf[..offset])
340 {
341 let len = offset - rest.len();
342 break (record, len);
343 }
344 let n = client_socket.read(&mut buf[offset..]).await.expect("read");
345 assert!(n > 0, "socket closed before receiving manifest");
346 offset += n;
347 };
348
349 assert_eq!(manifest_record.arguments[0].name(), "moniker");
351 assert_eq!(manifest_record.arguments[0].value(), Value::Text("foo".into()));
352 assert_eq!(manifest_record.arguments[1].name(), "url");
353 assert_eq!(manifest_record.arguments[1].value(), Value::Text("fuchsia-pkg://foo".into()));
354
355 let (log_record, _log_len) = loop {
357 if offset > manifest_len
358 && let Ok((record, rest)) = parse_record(&buf[manifest_len..offset])
359 {
360 let len = offset - manifest_len - rest.len();
361 break (record, len);
362 }
363 let n = client_socket.read(&mut buf[offset..]).await.expect("read");
364 assert!(n > 0, "socket closed before receiving log record");
365 offset += n;
366 };
367
368 let header1 = Header::read_from_bytes(&buf[0..8]).unwrap();
370 let tag_id = header1.tag();
371 assert_ne!(tag_id & LOG_CONTROL_BIT, 0, "Manifest should have LOG_CONTROL_BIT set");
372
373 let header2 = Header::read_from_bytes(&buf[manifest_len..manifest_len + 8]).unwrap();
375 assert_eq!(
376 header2.tag() & LOG_CONTROL_BIT,
377 0,
378 "Log record should NOT have LOG_CONTROL_BIT set"
379 );
380 assert_eq!(
381 header2.tag(),
382 tag_id & !LOG_CONTROL_BIT,
383 "Log record tag should match Manifest tag ID"
384 );
385 assert_eq!(header2.tag(), container_tag, "Log record tag ID should equal IOB tag ID");
386
387 assert_eq!(log_record.arguments[2].value(), Value::Text("a".into()));
388 }
389
390 #[fuchsia::test]
391 async fn log_stream_with_manifest_reused_tag() {
392 use crate::logs::shared_buffer::create_ring_buffer;
393 let repo = LogsRepository::new(
395 create_ring_buffer(65536),
396 std::iter::empty(),
397 &Default::default(),
398 fasync::Scope::new(),
399 );
400
401 let scope = fasync::Scope::new();
402 let server = Arc::new(LogStreamServer::new(Arc::clone(&repo), scope));
403 let (proxy, stream) =
404 fidl::endpoints::create_proxy_and_stream::<fdiagnostics::LogStreamMarker>();
405 server.spawn(stream);
406
407 let (client_socket, server_socket) = zx::Socket::create_stream();
408 let mut client_socket = fasync::Socket::from_socket(client_socket);
409
410 let opts = fdiagnostics::LogStreamOptions {
411 subscribe_to_manifest: Some(true),
412 mode: Some(StreamMode::SnapshotThenSubscribe),
413 ..Default::default()
414 };
415 proxy.connect(server_socket, &opts).expect("connect");
416
417 let identity_a = Arc::new(ComponentIdentity::new(
419 ExtendedMoniker::parse_str("./foo").unwrap(),
420 "fuchsia-pkg://foo",
421 ));
422 let container_a = repo.get_log_container(Arc::clone(&identity_a));
423 let tag_a = container_a.buffer().iob_tag();
424
425 container_a.ingest_message(make_message("msg_a", None, zx::BootInstant::from_nanos(1)));
427
428 let mut buf = vec![0u8; 65536];
429 let mut offset = 0;
430
431 async fn read_one_record(
433 socket: &mut fasync::Socket,
434 buf: &mut [u8],
435 offset: &mut usize,
436 ) -> (diagnostics_log_encoding::Record<'static>, usize) {
437 loop {
438 if *offset > 0
439 && let Ok((record, rest)) = parse_record(&buf[..*offset])
440 {
441 let len = *offset - rest.len();
442 let owned_record = record.into_owned();
443 buf.copy_within(len..*offset, 0);
445 *offset -= len;
446 return (owned_record, len);
448 }
449 let n = socket.read(&mut buf[*offset..]).await.expect("read");
450 assert!(n > 0, "socket closed unexpectedly");
451 *offset += n;
452 }
453 }
454
455 let (manifest_a, _) = read_one_record(&mut client_socket, &mut buf, &mut offset).await;
457 assert_eq!(manifest_a.arguments[0].value(), Value::Text("foo".into()));
458
459 let (log_a, _) = read_one_record(&mut client_socket, &mut buf, &mut offset).await;
460 assert_eq!(log_a.arguments[2].value(), Value::Text("msg_a".into()));
461
462 container_a.mark_stopped();
464 drop(container_a);
465
466 let identity_filler = Arc::new(ComponentIdentity::new(
468 ExtendedMoniker::parse_str("./filler").unwrap(),
469 "fuchsia-pkg://filler",
470 ));
471 let container_filler = repo.get_log_container(Arc::clone(&identity_filler));
472
473 let identity_b = Arc::new(ComponentIdentity::new(
474 ExtendedMoniker::parse_str("./bar").unwrap(),
475 "fuchsia-pkg://bar",
476 ));
477
478 let mut container_b;
479 loop {
480 container_filler.ingest_message(make_message(
482 "fill",
483 None,
484 zx::BootInstant::from_nanos(1),
485 ));
486
487 loop {
491 let mut temp_buf = [0u8; 1024];
492 let read_fut = client_socket.read(&mut temp_buf);
494 match futures::poll!(read_fut) {
495 std::task::Poll::Ready(Ok(n)) if n > 0 => {
496 }
500 _ => break, }
502 }
503
504 container_b = repo.get_log_container(Arc::clone(&identity_b));
506 if container_b.buffer().iob_tag() == tag_a {
507 break;
508 }
509
510 container_b.mark_stopped();
512 drop(container_b);
513
514 fasync::Timer::new(std::time::Duration::from_millis(10)).await;
516 }
517
518 container_b.ingest_message(make_message("msg_b", None, zx::BootInstant::from_nanos(2)));
520
521 loop {
530 let (record, _) = read_one_record(&mut client_socket, &mut buf, &mut offset).await;
531
532 if !record.arguments.is_empty()
534 && record.arguments[0].name() == "moniker"
535 && record.arguments[0].value() == Value::Text("bar".into())
536 {
537 break;
539 }
540 }
542
543 let (log_b, _) = read_one_record(&mut client_socket, &mut buf, &mut offset).await;
545 assert_eq!(log_b.arguments[2].value(), Value::Text("msg_b".into()));
546 }
547
548 #[test_case(ExtendRecordOpts::default(), "", "", 0 ; "no_additional_metadata")]
549 #[test_case(
550 ExtendRecordOpts { moniker: true, ..Default::default() },
551 "UNKNOWN",
552 "",
553 0
554 ; "with_moniker")]
555 #[test_case(
556 ExtendRecordOpts { component_url: true, ..Default::default() },
557 "",
558 "fuchsia-pkg://UNKNOWN",
559 0
560 ; "with_url")]
561 #[test_case(
562 ExtendRecordOpts { rolled_out: true, ..Default::default() },
563 "",
564 "",
565 42
566 ; "with_rolled_out")]
567 #[test_case(
568 ExtendRecordOpts { moniker: true, component_url: true, rolled_out: true, subscribe_to_manifest: false },
569 "UNKNOWN",
570 "fuchsia-pkg://UNKNOWN",
571 42
572 ; "with_all")]
573 #[fuchsia::test]
574 fn extend_record_with_metadata(
575 opts: ExtendRecordOpts,
576 expected_moniker: &str,
577 expected_url: &str,
578 expected_rolled_out: u64,
579 ) {
580 let mut buffer = Vec::new();
581 extend_fxt_record(&ComponentIdentity::unknown(), 42, &opts, &mut buffer);
582
583 if !opts.should_extend() {
584 assert!(buffer.is_empty());
585 return;
586 }
587
588 let moniker_len = u32::from_le_bytes(buffer[0..4].try_into().unwrap()) as usize;
589 let component_url_len = u32::from_le_bytes(buffer[4..8].try_into().unwrap()) as usize;
590
591 let rolled_out = u64::from_le_bytes(buffer[8..16].try_into().unwrap());
592 if opts.rolled_out {
593 assert_eq!(rolled_out, expected_rolled_out);
594 } else {
595 assert_eq!(rolled_out, 0);
596 }
597
598 let mut offset = 16;
599 let moniker = std::str::from_utf8(&buffer[offset..offset + moniker_len]).unwrap();
600 assert_eq!(moniker, expected_moniker);
601 let moniker_padded_len = (moniker_len + 7) & !7;
602 offset += moniker_padded_len;
603
604 let url = std::str::from_utf8(&buffer[offset..offset + component_url_len]).unwrap();
605 assert_eq!(url, expected_url);
606 let component_url_padded_len = (component_url_len + 7) & !7;
607 offset += component_url_padded_len;
608
609 assert_eq!(offset, buffer.len());
610 }
611}