archivist_lib/logs/servers/
log_stream.rs1use crate::identity::ComponentIdentity;
5use crate::logs::error::LogsError;
6use crate::logs::repository::LogsRepository;
7use crate::logs::shared_buffer::FxtMessage;
8use fidl::endpoints::{ControlHandle, DiscoverableProtocolMarker};
9use fidl_fuchsia_diagnostics::StreamMode;
10use futures::{AsyncWriteExt, Stream, StreamExt};
11use log::warn;
12use std::pin::pin;
13use std::sync::Arc;
14use {fidl_fuchsia_diagnostics as fdiagnostics, fuchsia_async as fasync};
15
16pub struct LogStreamServer {
17 logs_repo: Arc<LogsRepository>,
19
20 scope: fasync::Scope,
22}
23
24impl LogStreamServer {
25 pub fn new(logs_repo: Arc<LogsRepository>, scope: fasync::Scope) -> Self {
26 Self { logs_repo, scope }
27 }
28
29 pub fn spawn(&self, stream: fdiagnostics::LogStreamRequestStream) {
31 let logs_repo = Arc::clone(&self.logs_repo);
32 let scope = self.scope.to_handle();
33 self.scope.spawn(async move {
34 if let Err(e) = Self::handle_requests(logs_repo, stream, scope).await {
35 warn!("error handling Log requests: {}", e);
36 }
37 });
38 }
39
40 async fn handle_requests(
43 logs_repo: Arc<LogsRepository>,
44 mut stream: fdiagnostics::LogStreamRequestStream,
45 scope: fasync::ScopeHandle,
46 ) -> Result<(), LogsError> {
47 while let Some(request) = stream.next().await {
48 let request = request.map_err(|source| LogsError::HandlingRequests {
49 protocol: fdiagnostics::LogStreamMarker::PROTOCOL_NAME,
50 source,
51 })?;
52
53 match request {
54 fdiagnostics::LogStreamRequest::Connect { socket, opts, .. } => {
55 let logs = logs_repo.logs_cursor_raw(
56 opts.mode.unwrap_or(StreamMode::SnapshotThenSubscribe),
57 Vec::new(),
58 );
59 let opts = ExtendRecordOpts::from(opts);
60 scope.spawn(Self::stream_logs(fasync::Socket::from_socket(socket), logs, opts));
61 }
62 fdiagnostics::LogStreamRequest::_UnknownMethod {
63 ordinal,
64 method_type,
65 control_handle,
66 ..
67 } => {
68 warn!(ordinal, method_type:?; "Unknown request. Closing connection");
69 control_handle.shutdown_with_epitaph(zx::Status::UNAVAILABLE);
70 }
71 }
72 }
73 Ok(())
74 }
75
76 async fn stream_logs(
77 mut socket: fasync::Socket,
78 logs: impl Stream<Item = FxtMessage>,
79 opts: ExtendRecordOpts,
80 ) {
81 let mut logs = pin!(logs);
82 let mut buffer = Vec::new();
83 while let Some(message) = logs.next().await {
84 buffer.clear();
85 buffer.extend_from_slice(message.data());
86 extend_fxt_record(message.component_identity(), message.dropped(), &opts, &mut buffer);
87 let result = socket.write_all(&buffer).await;
88 if result.is_err() {
89 break;
91 }
92 }
93 }
94}
95
96#[derive(Default)]
97pub struct ExtendRecordOpts {
98 pub moniker: bool,
99 pub component_url: bool,
100 pub rolled_out: bool,
101}
102
103impl ExtendRecordOpts {
104 fn should_extend(&self) -> bool {
105 let Self { moniker, component_url, rolled_out } = self;
106 *moniker || *component_url || *rolled_out
107 }
108}
109
110impl From<fdiagnostics::LogStreamOptions> for ExtendRecordOpts {
111 fn from(opts: fdiagnostics::LogStreamOptions) -> Self {
112 let fdiagnostics::LogStreamOptions {
113 include_moniker,
114 include_component_url,
115 include_rolled_out,
116 mode: _,
117 __source_breaking: _,
118 } = opts;
119 Self {
120 moniker: include_moniker.unwrap_or(false),
121 component_url: include_component_url.unwrap_or(false),
122 rolled_out: include_rolled_out.unwrap_or(false),
123 }
124 }
125}
126
127fn padding(len: usize) -> &'static [u8] {
129 &[0; 8][(len + 7) % 8 + 1..]
130}
131
132pub fn extend_fxt_record(
133 identity: &ComponentIdentity,
134 rolled_out: u64,
135 opts: &ExtendRecordOpts,
136 buffer: &mut Vec<u8>,
137) {
138 if !opts.should_extend() {
139 return;
140 }
141
142 let moniker = if opts.moniker { identity.moniker.as_ref() } else { "" };
143 let component_url = if opts.component_url { identity.url.as_ref() } else { "" };
144 let rolled_out_value = if opts.rolled_out { rolled_out } else { 0 };
145
146 let moniker_len = moniker.len() as u32;
147 let component_url_len = component_url.len() as u32;
148
149 buffer.extend_from_slice(&moniker_len.to_le_bytes());
150 buffer.extend_from_slice(&component_url_len.to_le_bytes());
151 buffer.extend_from_slice(&rolled_out_value.to_le_bytes());
152
153 buffer.extend_from_slice(moniker.as_bytes());
154 buffer.extend_from_slice(padding(moniker.len()));
155
156 buffer.extend_from_slice(component_url.as_bytes());
157 buffer.extend_from_slice(padding(component_url.len()));
158}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163 use test_case::test_case;
164
165 #[test_case(ExtendRecordOpts::default(), "", "", 0 ; "no_additional_metadata")]
166 #[test_case(
167 ExtendRecordOpts { moniker: true, ..Default::default() },
168 "UNKNOWN",
169 "",
170 0
171 ; "with_moniker")]
172 #[test_case(
173 ExtendRecordOpts { component_url: true, ..Default::default() },
174 "",
175 "fuchsia-pkg://UNKNOWN",
176 0
177 ; "with_url")]
178 #[test_case(
179 ExtendRecordOpts { rolled_out: true, ..Default::default() },
180 "",
181 "",
182 42
183 ; "with_rolled_out")]
184 #[test_case(
185 ExtendRecordOpts { moniker: true, component_url: true, rolled_out: true },
186 "UNKNOWN",
187 "fuchsia-pkg://UNKNOWN",
188 42
189 ; "with_all")]
190 #[fuchsia::test]
191 fn extend_record_with_metadata(
192 opts: ExtendRecordOpts,
193 expected_moniker: &str,
194 expected_url: &str,
195 expected_rolled_out: u64,
196 ) {
197 let mut buffer = Vec::new();
198 extend_fxt_record(&ComponentIdentity::unknown(), 42, &opts, &mut buffer);
199
200 if !opts.should_extend() {
201 assert!(buffer.is_empty());
202 return;
203 }
204
205 let moniker_len = u32::from_le_bytes(buffer[0..4].try_into().unwrap()) as usize;
206 let component_url_len = u32::from_le_bytes(buffer[4..8].try_into().unwrap()) as usize;
207
208 let rolled_out = u64::from_le_bytes(buffer[8..16].try_into().unwrap());
209 if opts.rolled_out {
210 assert_eq!(rolled_out, expected_rolled_out);
211 } else {
212 assert_eq!(rolled_out, 0);
213 }
214
215 let mut offset = 16;
216 let moniker = std::str::from_utf8(&buffer[offset..offset + moniker_len]).unwrap();
217 assert_eq!(moniker, expected_moniker);
218 let moniker_padded_len = (moniker_len + 7) & !7;
219 offset += moniker_padded_len;
220
221 let url = std::str::from_utf8(&buffer[offset..offset + component_url_len]).unwrap();
222 assert_eq!(url, expected_url);
223 let component_url_padded_len = (component_url_len + 7) & !7;
224 offset += component_url_padded_len;
225
226 assert_eq!(offset, buffer.len());
227 }
228}