Skip to main content

archivist_lib/logs/servers/
log_stream.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use crate::identity::ComponentIdentity;
5use crate::logs::error::LogsError;
6use crate::logs::repository::LogsRepository;
7use crate::logs::shared_buffer::FilterCursor;
8use diagnostics_log_encoding::encode::{Encoder, EncoderOpts, ResizableBuffer};
9use diagnostics_log_encoding::{Argument, Header, LOG_CONTROL_BIT, MONIKER, Record, URL};
10use fidl::endpoints::{ControlHandle, DiscoverableProtocolMarker, RequestStream};
11use fidl_fuchsia_diagnostics as fdiagnostics;
12use fidl_fuchsia_diagnostics::StreamMode;
13use fidl_fuchsia_diagnostics_types::Severity;
14use fuchsia_async as fasync;
15use futures::{AsyncWriteExt, StreamExt};
16use log::warn;
17use std::collections::HashMap;
18use std::future::poll_fn;
19use std::io::Cursor;
20use std::pin::pin;
21use std::sync::Arc;
22use std::task::{Poll, ready};
23use zerocopy::{FromBytes, IntoBytes};
24
25#[derive(thiserror::Error, Debug)]
26enum StreamError {
27    #[error(transparent)]
28    Io(#[from] std::io::Error),
29}
30
31pub struct LogStreamServer {
32    /// The repository holding the logs.
33    logs_repo: Arc<LogsRepository>,
34
35    /// Scope in which we spawn all of the server tasks.
36    scope: fasync::Scope,
37}
38
39impl LogStreamServer {
40    pub fn new(logs_repo: Arc<LogsRepository>, scope: fasync::Scope) -> Self {
41        Self { logs_repo, scope }
42    }
43
44    /// Spawn a task to handle requests from components reading the shared log.
45    pub fn spawn(&self, stream: fdiagnostics::LogStreamRequestStream) {
46        let logs_repo = Arc::clone(&self.logs_repo);
47        let scope = self.scope.to_handle();
48        self.scope.spawn(async move {
49            if let Err(e) = Self::handle_requests(logs_repo, stream, scope).await {
50                warn!("error handling Log requests: {}", e);
51            }
52        });
53    }
54
55    /// Handle requests to `fuchsia.diagnostics.LogStream`. All request types read the
56    /// whole backlog from memory, `DumpLogs(Safe)` stops listening after that.
57    async fn handle_requests(
58        logs_repo: Arc<LogsRepository>,
59        mut stream: fdiagnostics::LogStreamRequestStream,
60        scope: fasync::ScopeHandle,
61    ) -> Result<(), LogsError> {
62        while let Some(request) = stream.next().await {
63            let request = request.map_err(|source| LogsError::HandlingRequests {
64                protocol: fdiagnostics::LogStreamMarker::PROTOCOL_NAME,
65                source,
66            })?;
67
68            match request {
69                fdiagnostics::LogStreamRequest::Connect { socket, opts, .. } => {
70                    let logs = logs_repo.logs_cursor_raw(
71                        opts.mode.unwrap_or(StreamMode::SnapshotThenSubscribe),
72                        Vec::new(),
73                    );
74                    let opts = ExtendRecordOpts::from(opts);
75                    if opts.subscribe_to_manifest {
76                        if opts.moniker || opts.component_url || opts.rolled_out {
77                            stream.control_handle().shutdown_with_epitaph(zx::Status::INVALID_ARGS);
78                            return Ok(());
79                        }
80
81                        scope.spawn(async move {
82                            let _ = Self::stream_logs_with_manifest(
83                                fasync::Socket::from_socket(socket),
84                                logs,
85                            )
86                            .await;
87                        });
88                    } else {
89                        scope.spawn(Self::stream_logs(
90                            fasync::Socket::from_socket(socket),
91                            logs,
92                            opts,
93                        ));
94                    }
95                }
96                fdiagnostics::LogStreamRequest::_UnknownMethod {
97                    ordinal,
98                    method_type,
99                    control_handle,
100                    ..
101                } => {
102                    warn!(ordinal, method_type:?; "Unknown request. Closing connection");
103                    control_handle.shutdown_with_epitaph(zx::Status::UNAVAILABLE);
104                }
105            }
106        }
107        Ok(())
108    }
109
110    async fn stream_logs(mut socket: fasync::Socket, logs: FilterCursor, opts: ExtendRecordOpts) {
111        let mut logs = pin!(logs);
112        let mut buffer = Vec::new();
113        while let Some((identity, dropped)) = poll_fn(|cx| {
114            loop {
115                match ready!(logs.as_mut().poll_next(cx)) {
116                    Some(message) => {
117                        let (identity_ref, header, data) = match message.parse() {
118                            Ok(res) => res,
119                            Err(_) => {
120                                // If we fail to parse the message, just ignore it and move on
121                                // to the next message.
122                                continue;
123                            }
124                        };
125                        let identity = Arc::clone(identity_ref);
126                        buffer.clear();
127                        buffer.reserve(data.len());
128                        buffer.extend_from_slice(header.as_bytes());
129                        buffer.extend_from_slice(&data[8..]);
130                        return Poll::Ready(Some((identity, message.dropped)));
131                    }
132                    None => return Poll::Ready(None),
133                }
134            }
135        })
136        .await
137        {
138            extend_fxt_record(&identity, dropped, &opts, &mut buffer);
139            if socket.write_all(&buffer).await.is_err() {
140                // Assume an error means the peer closed for now.
141                break;
142            }
143        }
144    }
145
146    async fn stream_logs_with_manifest(
147        mut socket: fasync::Socket,
148        logs: FilterCursor,
149    ) -> Result<(), StreamError> {
150        let mut logs = pin!(logs);
151        let mut sent_tags = HashMap::new();
152        let mut buffer = Vec::new();
153        while let Some((identity, tag)) = poll_fn(|cx| {
154            loop {
155                match ready!(logs.as_mut().poll_next(cx)) {
156                    Some(message) => {
157                        let (identity_ref, header, data) = match message.parse() {
158                            Ok(res) => res,
159                            Err(_) => continue,
160                        };
161                        let identity = Arc::clone(identity_ref);
162                        buffer.clear();
163                        buffer.reserve(data.len());
164                        buffer.extend_from_slice(header.as_bytes());
165                        buffer.extend_from_slice(&data[8..]);
166                        return Poll::Ready(Some((identity, header.tag() as u64)));
167                    }
168                    None => return Poll::Ready(None),
169                }
170            }
171        })
172        .await
173        {
174            let send = match sent_tags.entry(tag) {
175                std::collections::hash_map::Entry::Vacant(e) => {
176                    e.insert(Arc::clone(&identity));
177                    true
178                }
179                std::collections::hash_map::Entry::Occupied(mut e) => {
180                    if !Arc::ptr_eq(e.get(), &identity) && **e.get() != *identity {
181                        e.insert(Arc::clone(&identity));
182                        true
183                    } else {
184                        false
185                    }
186                }
187            };
188
189            if send {
190                Self::send_component_change(&mut socket, tag, &identity).await?;
191            }
192            socket.write_all(&buffer).await?;
193        }
194        Ok(())
195    }
196
197    async fn send_component_change(
198        socket: &mut fasync::Socket,
199        id: u64,
200        identity: &ComponentIdentity,
201    ) -> Result<(), std::io::Error> {
202        let mut encoder =
203            Encoder::new(Cursor::new(ResizableBuffer::from(Vec::new())), EncoderOpts::default());
204        let record = Record {
205            timestamp: zx::BootInstant::from_nanos(0),
206            severity: Severity::Info.into_primitive(),
207            arguments: vec![
208                Argument::other(MONIKER, identity.moniker.to_string()),
209                Argument::other(URL, identity.url.as_str()),
210            ],
211        };
212        encoder.write_record(record).map_err(std::io::Error::other)?;
213
214        let mut buffer = encoder.take().into_inner().into_inner();
215        if buffer.len() >= 8 {
216            let mut header = Header::read_from_bytes(&buffer[0..8]).unwrap();
217            header.set_tag((id as u32) | LOG_CONTROL_BIT);
218            buffer[0..8].copy_from_slice(header.as_bytes());
219        }
220        socket.write_all(&buffer).await?;
221        Ok(())
222    }
223}
224
225#[derive(Default)]
226pub struct ExtendRecordOpts {
227    pub moniker: bool,
228    pub component_url: bool,
229    pub rolled_out: bool,
230    pub subscribe_to_manifest: bool,
231}
232
233impl ExtendRecordOpts {
234    fn should_extend(&self) -> bool {
235        let Self { moniker, component_url, rolled_out, subscribe_to_manifest: _ } = self;
236        *moniker || *component_url || *rolled_out
237    }
238}
239
240impl From<fdiagnostics::LogStreamOptions> for ExtendRecordOpts {
241    fn from(opts: fdiagnostics::LogStreamOptions) -> Self {
242        let fdiagnostics::LogStreamOptions {
243            include_moniker,
244            include_component_url,
245            include_rolled_out,
246            mode: _,
247            __source_breaking: _,
248            subscribe_to_manifest,
249        } = opts;
250        Self {
251            moniker: include_moniker.unwrap_or(false),
252            component_url: include_component_url.unwrap_or(false),
253            rolled_out: include_rolled_out.unwrap_or(false),
254            subscribe_to_manifest: subscribe_to_manifest.unwrap_or(false),
255        }
256    }
257}
258
259/// Returns zero padding for `len`.
260fn padding(len: usize) -> &'static [u8] {
261    &[0; 8][(len + 7) % 8 + 1..]
262}
263
264pub fn extend_fxt_record(
265    identity: &ComponentIdentity,
266    rolled_out: u64,
267    opts: &ExtendRecordOpts,
268    buffer: &mut Vec<u8>,
269) {
270    if !opts.should_extend() {
271        return;
272    }
273
274    let moniker = if opts.moniker { identity.moniker.as_ref() } else { "" };
275    let component_url = if opts.component_url { identity.url.as_ref() } else { "" };
276    let rolled_out_value = if opts.rolled_out { rolled_out } else { 0 };
277
278    let moniker_len = moniker.len() as u32;
279    let component_url_len = component_url.len() as u32;
280
281    buffer.extend_from_slice(&moniker_len.to_le_bytes());
282    buffer.extend_from_slice(&component_url_len.to_le_bytes());
283    buffer.extend_from_slice(&rolled_out_value.to_le_bytes());
284
285    buffer.extend_from_slice(moniker.as_bytes());
286    buffer.extend_from_slice(padding(moniker.len()));
287
288    buffer.extend_from_slice(component_url.as_bytes());
289    buffer.extend_from_slice(padding(component_url.len()));
290}
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295    use crate::logs::testing::make_message;
296    use diagnostics_log_encoding::Value;
297    use diagnostics_log_encoding::parse::parse_record;
298    use futures::AsyncReadExt;
299    use moniker::ExtendedMoniker;
300    use test_case::test_case;
301    use zx;
302
303    #[fuchsia::test]
304    async fn log_stream_with_manifest() {
305        let repo = LogsRepository::for_test(fasync::Scope::new());
306        let identity = Arc::new(ComponentIdentity::new(
307            ExtendedMoniker::parse_str("./foo").unwrap(),
308            "fuchsia-pkg://foo",
309        ));
310        let container = repo.get_log_container(Arc::clone(&identity));
311        let container_tag = container.buffer().iob_tag() as u32;
312
313        let scope = fasync::Scope::new();
314        let server = Arc::new(LogStreamServer::new(Arc::clone(&repo), scope));
315        let (proxy, stream) =
316            fidl::endpoints::create_proxy_and_stream::<fdiagnostics::LogStreamMarker>();
317        server.spawn(stream);
318
319        let (client_socket, server_socket) = zx::Socket::create_stream();
320        let mut client_socket = fasync::Socket::from_socket(client_socket);
321
322        let opts = fdiagnostics::LogStreamOptions {
323            subscribe_to_manifest: Some(true),
324            mode: Some(StreamMode::SnapshotThenSubscribe),
325            ..Default::default()
326        };
327        proxy.connect(server_socket, &opts).expect("connect");
328
329        // Wait for connection to be established/handled?
330        // We can just ingest. The cursor should pick it up.
331        container.ingest_message(make_message("a", None, zx::BootInstant::from_nanos(1)));
332
333        let mut buf = vec![0u8; 4096];
334        let mut offset = 0;
335
336        // 1. Read Manifest
337        let (manifest_record, manifest_len) = loop {
338            if offset > 0
339                && let Ok((record, rest)) = parse_record(&buf[..offset])
340            {
341                let len = offset - rest.len();
342                break (record, len);
343            }
344            let n = client_socket.read(&mut buf[offset..]).await.expect("read");
345            assert!(n > 0, "socket closed before receiving manifest");
346            offset += n;
347        };
348
349        // Check manifest arguments
350        assert_eq!(manifest_record.arguments[0].name(), "moniker");
351        assert_eq!(manifest_record.arguments[0].value(), Value::Text("foo".into()));
352        assert_eq!(manifest_record.arguments[1].name(), "url");
353        assert_eq!(manifest_record.arguments[1].value(), Value::Text("fuchsia-pkg://foo".into()));
354
355        // 2. Read Log Record
356        let (log_record, _log_len) = loop {
357            if offset > manifest_len
358                && let Ok((record, rest)) = parse_record(&buf[manifest_len..offset])
359            {
360                let len = offset - manifest_len - rest.len();
361                break (record, len);
362            }
363            let n = client_socket.read(&mut buf[offset..]).await.expect("read");
364            assert!(n > 0, "socket closed before receiving log record");
365            offset += n;
366        };
367
368        // Verify header tag of first record (Manifest)
369        let header1 = Header::read_from_bytes(&buf[0..8]).unwrap();
370        let tag_id = header1.tag();
371        assert_ne!(tag_id & LOG_CONTROL_BIT, 0, "Manifest should have LOG_CONTROL_BIT set");
372
373        // Verify header tag of second record (Log)
374        let header2 = Header::read_from_bytes(&buf[manifest_len..manifest_len + 8]).unwrap();
375        assert_eq!(
376            header2.tag() & LOG_CONTROL_BIT,
377            0,
378            "Log record should NOT have LOG_CONTROL_BIT set"
379        );
380        assert_eq!(
381            header2.tag(),
382            tag_id & !LOG_CONTROL_BIT,
383            "Log record tag should match Manifest tag ID"
384        );
385        assert_eq!(header2.tag(), container_tag, "Log record tag ID should equal IOB tag ID");
386
387        assert_eq!(log_record.arguments[2].value(), Value::Text("a".into()));
388    }
389
390    #[fuchsia::test]
391    async fn log_stream_with_manifest_reused_tag() {
392        use crate::logs::shared_buffer::create_ring_buffer;
393        // Use a small buffer to facilitate rolling out logs.
394        let repo = LogsRepository::new(
395            create_ring_buffer(65536),
396            std::iter::empty(),
397            &Default::default(),
398            fasync::Scope::new(),
399        );
400
401        let scope = fasync::Scope::new();
402        let server = Arc::new(LogStreamServer::new(Arc::clone(&repo), scope));
403        let (proxy, stream) =
404            fidl::endpoints::create_proxy_and_stream::<fdiagnostics::LogStreamMarker>();
405        server.spawn(stream);
406
407        let (client_socket, server_socket) = zx::Socket::create_stream();
408        let mut client_socket = fasync::Socket::from_socket(client_socket);
409
410        let opts = fdiagnostics::LogStreamOptions {
411            subscribe_to_manifest: Some(true),
412            mode: Some(StreamMode::SnapshotThenSubscribe),
413            ..Default::default()
414        };
415        proxy.connect(server_socket, &opts).expect("connect");
416
417        // 1. Setup Identity A
418        let identity_a = Arc::new(ComponentIdentity::new(
419            ExtendedMoniker::parse_str("./foo").unwrap(),
420            "fuchsia-pkg://foo",
421        ));
422        let container_a = repo.get_log_container(Arc::clone(&identity_a));
423        let tag_a = container_a.buffer().iob_tag();
424
425        // 2. Ingest A
426        container_a.ingest_message(make_message("msg_a", None, zx::BootInstant::from_nanos(1)));
427
428        let mut buf = vec![0u8; 65536];
429        let mut offset = 0;
430
431        // Helper to read one record from the socket
432        async fn read_one_record(
433            socket: &mut fasync::Socket,
434            buf: &mut [u8],
435            offset: &mut usize,
436        ) -> (diagnostics_log_encoding::Record<'static>, usize) {
437            loop {
438                if *offset > 0
439                    && let Ok((record, rest)) = parse_record(&buf[..*offset])
440                {
441                    let len = *offset - rest.len();
442                    let owned_record = record.into_owned();
443                    // Shift buffer
444                    buf.copy_within(len..*offset, 0);
445                    *offset -= len;
446                    // Return owned record to avoid lifetime issues
447                    return (owned_record, len);
448                }
449                let n = socket.read(&mut buf[*offset..]).await.expect("read");
450                assert!(n > 0, "socket closed unexpectedly");
451                *offset += n;
452            }
453        }
454
455        // 3. Read Manifest A and Log A
456        let (manifest_a, _) = read_one_record(&mut client_socket, &mut buf, &mut offset).await;
457        assert_eq!(manifest_a.arguments[0].value(), Value::Text("foo".into()));
458
459        let (log_a, _) = read_one_record(&mut client_socket, &mut buf, &mut offset).await;
460        assert_eq!(log_a.arguments[2].value(), Value::Text("msg_a".into()));
461
462        // 4. Mark A inactive and release
463        container_a.mark_stopped();
464        drop(container_a);
465
466        // 5. Force rollout A by ingesting filler logs
467        let identity_filler = Arc::new(ComponentIdentity::new(
468            ExtendedMoniker::parse_str("./filler").unwrap(),
469            "fuchsia-pkg://filler",
470        ));
471        let container_filler = repo.get_log_container(Arc::clone(&identity_filler));
472
473        let identity_b = Arc::new(ComponentIdentity::new(
474            ExtendedMoniker::parse_str("./bar").unwrap(),
475            "fuchsia-pkg://bar",
476        ));
477
478        let mut container_b;
479        loop {
480            // Ingest filler
481            container_filler.ingest_message(make_message(
482                "fill",
483                None,
484                zx::BootInstant::from_nanos(1),
485            ));
486
487            // Drain socket to ensure flow
488
489            // Read available data
490            loop {
491                let mut temp_buf = [0u8; 1024];
492                // Use poll_read to not block
493                let read_fut = client_socket.read(&mut temp_buf);
494                match futures::poll!(read_fut) {
495                    std::task::Poll::Ready(Ok(n)) if n > 0 => {
496                        // We just discard filler data for now, but we need to watch for B?
497                        // No, B hasn't been created/ingested yet.
498                        // We are just draining filler.
499                    }
500                    _ => break, // No more data or pending
501                }
502            }
503
504            // Try to allocate B
505            container_b = repo.get_log_container(Arc::clone(&identity_b));
506            if container_b.buffer().iob_tag() == tag_a {
507                break;
508            }
509
510            // Failed to reuse, clean up B
511            container_b.mark_stopped();
512            drop(container_b);
513
514            // Yield to let cleanup tasks run
515            fasync::Timer::new(std::time::Duration::from_millis(10)).await;
516        }
517
518        // 6. Ingest B
519        container_b.ingest_message(make_message("msg_b", None, zx::BootInstant::from_nanos(2)));
520
521        // 7. Read Manifest B + Log B
522        // Note: Our buffer `buf` might contain leftover filler data or partial records.
523        // But we discarded filler data in the loop above (into temp_buf).
524        // `buf` and `offset` state from `read_one_record` was preserved.
525        // Wait, `read_one_record` modifies `buf` and `offset` (shifts data).
526        // So `buf` should be clean or contain partial data.
527
528        // We need to read until we find Manifest B.
529        loop {
530            let (record, _) = read_one_record(&mut client_socket, &mut buf, &mut offset).await;
531
532            // Is it Manifest B?
533            if !record.arguments.is_empty()
534                && record.arguments[0].name() == "moniker"
535                && record.arguments[0].value() == Value::Text("bar".into())
536            {
537                // Found Manifest B!
538                break;
539            }
540            // Otherwise it's filler log or Manifest Filler
541        }
542
543        // Next should be Log B
544        let (log_b, _) = read_one_record(&mut client_socket, &mut buf, &mut offset).await;
545        assert_eq!(log_b.arguments[2].value(), Value::Text("msg_b".into()));
546    }
547
548    #[test_case(ExtendRecordOpts::default(), "", "", 0 ; "no_additional_metadata")]
549    #[test_case(
550        ExtendRecordOpts { moniker: true, ..Default::default() },
551        "UNKNOWN",
552        "",
553        0
554        ; "with_moniker")]
555    #[test_case(
556        ExtendRecordOpts { component_url: true, ..Default::default() },
557        "",
558        "fuchsia-pkg://UNKNOWN",
559        0
560        ; "with_url")]
561    #[test_case(
562        ExtendRecordOpts { rolled_out: true, ..Default::default() },
563        "",
564        "",
565        42
566        ; "with_rolled_out")]
567    #[test_case(
568        ExtendRecordOpts { moniker: true, component_url: true, rolled_out: true, subscribe_to_manifest: false },
569        "UNKNOWN",
570        "fuchsia-pkg://UNKNOWN",
571        42
572        ; "with_all")]
573    #[fuchsia::test]
574    fn extend_record_with_metadata(
575        opts: ExtendRecordOpts,
576        expected_moniker: &str,
577        expected_url: &str,
578        expected_rolled_out: u64,
579    ) {
580        let mut buffer = Vec::new();
581        extend_fxt_record(&ComponentIdentity::unknown(), 42, &opts, &mut buffer);
582
583        if !opts.should_extend() {
584            assert!(buffer.is_empty());
585            return;
586        }
587
588        let moniker_len = u32::from_le_bytes(buffer[0..4].try_into().unwrap()) as usize;
589        let component_url_len = u32::from_le_bytes(buffer[4..8].try_into().unwrap()) as usize;
590
591        let rolled_out = u64::from_le_bytes(buffer[8..16].try_into().unwrap());
592        if opts.rolled_out {
593            assert_eq!(rolled_out, expected_rolled_out);
594        } else {
595            assert_eq!(rolled_out, 0);
596        }
597
598        let mut offset = 16;
599        let moniker = std::str::from_utf8(&buffer[offset..offset + moniker_len]).unwrap();
600        assert_eq!(moniker, expected_moniker);
601        let moniker_padded_len = (moniker_len + 7) & !7;
602        offset += moniker_padded_len;
603
604        let url = std::str::from_utf8(&buffer[offset..offset + component_url_len]).unwrap();
605        assert_eq!(url, expected_url);
606        let component_url_padded_len = (component_url_len + 7) & !7;
607        offset += component_url_padded_len;
608
609        assert_eq!(offset, buffer.len());
610    }
611}