archivist_lib/logs/servers/
log_stream.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use crate::identity::ComponentIdentity;
5use crate::logs::error::LogsError;
6use crate::logs::repository::LogsRepository;
7use crate::logs::shared_buffer::FxtMessage;
8use fidl::endpoints::{ControlHandle, DiscoverableProtocolMarker};
9use fidl_fuchsia_diagnostics::StreamMode;
10use futures::{AsyncWriteExt, Stream, StreamExt};
11use log::warn;
12use std::pin::pin;
13use std::sync::Arc;
14use {fidl_fuchsia_diagnostics as fdiagnostics, fuchsia_async as fasync};
15
16pub struct LogStreamServer {
17    /// The repository holding the logs.
18    logs_repo: Arc<LogsRepository>,
19
20    /// Scope in which we spawn all of the server tasks.
21    scope: fasync::Scope,
22}
23
24impl LogStreamServer {
25    pub fn new(logs_repo: Arc<LogsRepository>, scope: fasync::Scope) -> Self {
26        Self { logs_repo, scope }
27    }
28
29    /// Spawn a task to handle requests from components reading the shared log.
30    pub fn spawn(&self, stream: fdiagnostics::LogStreamRequestStream) {
31        let logs_repo = Arc::clone(&self.logs_repo);
32        let scope = self.scope.to_handle();
33        self.scope.spawn(async move {
34            if let Err(e) = Self::handle_requests(logs_repo, stream, scope).await {
35                warn!("error handling Log requests: {}", e);
36            }
37        });
38    }
39
40    /// Handle requests to `fuchsia.diagnostics.LogStream`. All request types read the
41    /// whole backlog from memory, `DumpLogs(Safe)` stops listening after that.
42    async fn handle_requests(
43        logs_repo: Arc<LogsRepository>,
44        mut stream: fdiagnostics::LogStreamRequestStream,
45        scope: fasync::ScopeHandle,
46    ) -> Result<(), LogsError> {
47        while let Some(request) = stream.next().await {
48            let request = request.map_err(|source| LogsError::HandlingRequests {
49                protocol: fdiagnostics::LogStreamMarker::PROTOCOL_NAME,
50                source,
51            })?;
52
53            match request {
54                fdiagnostics::LogStreamRequest::Connect { socket, opts, .. } => {
55                    let logs = logs_repo.logs_cursor_raw(
56                        opts.mode.unwrap_or(StreamMode::SnapshotThenSubscribe),
57                        Vec::new(),
58                    );
59                    let opts = ExtendRecordOpts::from(opts);
60                    scope.spawn(Self::stream_logs(fasync::Socket::from_socket(socket), logs, opts));
61                }
62                fdiagnostics::LogStreamRequest::_UnknownMethod {
63                    ordinal,
64                    method_type,
65                    control_handle,
66                    ..
67                } => {
68                    warn!(ordinal, method_type:?; "Unknown request. Closing connection");
69                    control_handle.shutdown_with_epitaph(zx::Status::UNAVAILABLE);
70                }
71            }
72        }
73        Ok(())
74    }
75
76    async fn stream_logs(
77        mut socket: fasync::Socket,
78        logs: impl Stream<Item = FxtMessage>,
79        opts: ExtendRecordOpts,
80    ) {
81        let mut logs = pin!(logs);
82        let mut buffer = Vec::new();
83        while let Some(message) = logs.next().await {
84            buffer.clear();
85            buffer.extend_from_slice(message.data());
86            extend_fxt_record(message.component_identity(), message.dropped(), &opts, &mut buffer);
87            let result = socket.write_all(&buffer).await;
88            if result.is_err() {
89                // Assume an error means the peer closed for now.
90                break;
91            }
92        }
93    }
94}
95
96#[derive(Default)]
97pub struct ExtendRecordOpts {
98    pub moniker: bool,
99    pub component_url: bool,
100    pub rolled_out: bool,
101}
102
103impl ExtendRecordOpts {
104    fn should_extend(&self) -> bool {
105        let Self { moniker, component_url, rolled_out } = self;
106        *moniker || *component_url || *rolled_out
107    }
108}
109
110impl From<fdiagnostics::LogStreamOptions> for ExtendRecordOpts {
111    fn from(opts: fdiagnostics::LogStreamOptions) -> Self {
112        let fdiagnostics::LogStreamOptions {
113            include_moniker,
114            include_component_url,
115            include_rolled_out,
116            mode: _,
117            __source_breaking: _,
118        } = opts;
119        Self {
120            moniker: include_moniker.unwrap_or(false),
121            component_url: include_component_url.unwrap_or(false),
122            rolled_out: include_rolled_out.unwrap_or(false),
123        }
124    }
125}
126
127/// Returns zero padding for `len`.
128fn padding(len: usize) -> &'static [u8] {
129    &[0; 8][(len + 7) % 8 + 1..]
130}
131
132pub fn extend_fxt_record(
133    identity: &ComponentIdentity,
134    rolled_out: u64,
135    opts: &ExtendRecordOpts,
136    buffer: &mut Vec<u8>,
137) {
138    if !opts.should_extend() {
139        return;
140    }
141
142    let moniker = if opts.moniker { identity.moniker.as_ref() } else { "" };
143    let component_url = if opts.component_url { identity.url.as_ref() } else { "" };
144    let rolled_out_value = if opts.rolled_out { rolled_out } else { 0 };
145
146    let moniker_len = moniker.len() as u32;
147    let component_url_len = component_url.len() as u32;
148
149    buffer.extend_from_slice(&moniker_len.to_le_bytes());
150    buffer.extend_from_slice(&component_url_len.to_le_bytes());
151    buffer.extend_from_slice(&rolled_out_value.to_le_bytes());
152
153    buffer.extend_from_slice(moniker.as_bytes());
154    buffer.extend_from_slice(padding(moniker.len()));
155
156    buffer.extend_from_slice(component_url.as_bytes());
157    buffer.extend_from_slice(padding(component_url.len()));
158}
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163    use test_case::test_case;
164
165    #[test_case(ExtendRecordOpts::default(), "", "", 0 ; "no_additional_metadata")]
166    #[test_case(
167        ExtendRecordOpts { moniker: true, ..Default::default() },
168        "UNKNOWN",
169        "",
170        0
171        ; "with_moniker")]
172    #[test_case(
173        ExtendRecordOpts { component_url: true, ..Default::default() },
174        "",
175        "fuchsia-pkg://UNKNOWN",
176        0
177        ; "with_url")]
178    #[test_case(
179        ExtendRecordOpts { rolled_out: true, ..Default::default() },
180        "",
181        "",
182        42
183        ; "with_rolled_out")]
184    #[test_case(
185        ExtendRecordOpts { moniker: true, component_url: true, rolled_out: true },
186        "UNKNOWN",
187        "fuchsia-pkg://UNKNOWN",
188        42
189        ; "with_all")]
190    #[fuchsia::test]
191    fn extend_record_with_metadata(
192        opts: ExtendRecordOpts,
193        expected_moniker: &str,
194        expected_url: &str,
195        expected_rolled_out: u64,
196    ) {
197        let mut buffer = Vec::new();
198        extend_fxt_record(&ComponentIdentity::unknown(), 42, &opts, &mut buffer);
199
200        if !opts.should_extend() {
201            assert!(buffer.is_empty());
202            return;
203        }
204
205        let moniker_len = u32::from_le_bytes(buffer[0..4].try_into().unwrap()) as usize;
206        let component_url_len = u32::from_le_bytes(buffer[4..8].try_into().unwrap()) as usize;
207
208        let rolled_out = u64::from_le_bytes(buffer[8..16].try_into().unwrap());
209        if opts.rolled_out {
210            assert_eq!(rolled_out, expected_rolled_out);
211        } else {
212            assert_eq!(rolled_out, 0);
213        }
214
215        let mut offset = 16;
216        let moniker = std::str::from_utf8(&buffer[offset..offset + moniker_len]).unwrap();
217        assert_eq!(moniker, expected_moniker);
218        let moniker_padded_len = (moniker_len + 7) & !7;
219        offset += moniker_padded_len;
220
221        let url = std::str::from_utf8(&buffer[offset..offset + component_url_len]).unwrap();
222        assert_eq!(url, expected_url);
223        let component_url_padded_len = (component_url_len + 7) & !7;
224        offset += component_url_padded_len;
225
226        assert_eq!(offset, buffer.len());
227    }
228}