1use crate::constants::FORMATTED_CONTENT_CHUNK_SIZE_TARGET;
6use crate::diagnostics::{BatchIteratorConnectionStats, TRACE_CATEGORY};
7use crate::error::AccessorError;
8use crate::formatter::{
9 FXTPacketSerializer, FormattedStream, JsonPacketSerializer, SerializedVmo, new_batcher,
10};
11use crate::inspect::ReaderServer;
12use crate::inspect::repository::InspectRepository;
13use crate::logs::container::CursorItem;
14use crate::logs::repository::LogsRepository;
15use crate::pipeline::Pipeline;
16use diagnostics_data::{Data, DiagnosticsData, ExtendedMoniker, Metadata};
17use fidl::endpoints::{ControlHandle, RequestStream};
18use fidl_fuchsia_diagnostics::{
19 ArchiveAccessorRequest, ArchiveAccessorRequestStream, BatchIteratorControlHandle,
20 BatchIteratorRequest, BatchIteratorRequestStream, ClientSelectorConfiguration, DataType,
21 Format, FormattedContent, PerformanceConfiguration, Selector, SelectorArgument, StreamMode,
22 StreamParameters, StringSelector, TreeSelector, TreeSelectorUnknown,
23};
24use fidl_fuchsia_mem::Buffer;
25use fuchsia_inspect::NumericProperty;
26use fuchsia_sync::Mutex;
27use futures::future::{Either, select};
28use futures::prelude::*;
29use futures::stream::Peekable;
30use futures::{StreamExt, pin_mut};
31use log::warn;
32use selectors::FastError;
33use serde::Serialize;
34use std::collections::HashMap;
35use std::pin::Pin;
36use std::sync::Arc;
37use thiserror::Error;
38use {fidl_fuchsia_diagnostics_host as fhost, fuchsia_async as fasync, fuchsia_trace as ftrace};
39
40#[derive(Debug, Copy, Clone)]
41pub struct BatchRetrievalTimeout(i64);
42
43impl BatchRetrievalTimeout {
44 pub fn from_seconds(s: i64) -> Self {
45 Self(s)
46 }
47
48 #[cfg(test)]
49 pub fn max() -> Self {
50 Self::from_seconds(-1)
51 }
52
53 pub fn seconds(&self) -> i64 {
54 if self.0 > 0 { self.0 } else { i64::MAX }
55 }
56}
57
58pub struct ArchiveAccessorServer {
62 inspect_repository: Arc<InspectRepository>,
63 logs_repository: Arc<LogsRepository>,
64 maximum_concurrent_snapshots_per_reader: u64,
65 scope: fasync::Scope,
66 default_batch_timeout_seconds: BatchRetrievalTimeout,
67}
68
69fn validate_and_parse_selectors(
70 selector_args: Vec<SelectorArgument>,
71) -> Result<Vec<Selector>, AccessorError> {
72 let mut selectors = vec![];
73 let mut errors = vec![];
74
75 if selector_args.is_empty() {
76 return Err(AccessorError::EmptySelectors);
77 }
78
79 for selector_arg in selector_args {
80 match selectors::take_from_argument::<FastError>(selector_arg) {
81 Ok(s) => selectors.push(s),
82 Err(e) => errors.push(e),
83 }
84 }
85
86 if !errors.is_empty() {
87 warn!(errors:?; "Found errors in selector arguments");
88 }
89
90 Ok(selectors)
91}
92
93fn validate_and_parse_log_selectors(
94 selector_args: Vec<SelectorArgument>,
95) -> Result<Vec<Selector>, AccessorError> {
96 let selectors = validate_and_parse_selectors(selector_args)?;
98 for selector in &selectors {
99 let tree_selector = selector.tree_selector.as_ref().unwrap();
101 match tree_selector {
102 TreeSelector::PropertySelector(_) => {
103 return Err(AccessorError::InvalidLogSelector);
104 }
105 TreeSelector::SubtreeSelector(subtree_selector) => {
106 if subtree_selector.node_path.len() != 1 {
107 return Err(AccessorError::InvalidLogSelector);
108 }
109 match &subtree_selector.node_path[0] {
110 StringSelector::ExactMatch(val) if val == "root" => {}
111 StringSelector::StringPattern(val) if val == "root" => {}
112 _ => {
113 return Err(AccessorError::InvalidLogSelector);
114 }
115 }
116 }
117 TreeSelectorUnknown!() => {}
118 }
119 }
120 Ok(selectors)
121}
122
123impl ArchiveAccessorServer {
124 pub fn new(
128 inspect_repository: Arc<InspectRepository>,
129 logs_repository: Arc<LogsRepository>,
130 maximum_concurrent_snapshots_per_reader: u64,
131 default_batch_timeout_seconds: BatchRetrievalTimeout,
132 scope: fasync::Scope,
133 ) -> Self {
134 ArchiveAccessorServer {
135 inspect_repository,
136 logs_repository,
137 maximum_concurrent_snapshots_per_reader,
138 scope,
139 default_batch_timeout_seconds,
140 }
141 }
142
143 async fn spawn<R: ArchiveAccessorWriter + Send>(
144 pipeline: Arc<Pipeline>,
145 inspect_repo: Arc<InspectRepository>,
146 log_repo: Arc<LogsRepository>,
147 requests: R,
148 params: StreamParameters,
149 maximum_concurrent_snapshots_per_reader: u64,
150 default_batch_timeout_seconds: BatchRetrievalTimeout,
151 ) -> Result<(), AccessorError> {
152 let format = params.format.ok_or(AccessorError::MissingFormat)?;
153 if !matches!(format, Format::Json | Format::Cbor | Format::Fxt) {
154 return Err(AccessorError::UnsupportedFormat);
155 }
156 let mode = params.stream_mode.ok_or(AccessorError::MissingMode)?;
157
158 let performance_config: PerformanceConfig = PerformanceConfig::new(
159 ¶ms,
160 maximum_concurrent_snapshots_per_reader,
161 default_batch_timeout_seconds,
162 )?;
163
164 let trace_id = ftrace::Id::random();
165 match params.data_type.ok_or(AccessorError::MissingDataType)? {
166 DataType::Inspect => {
167 let _trace_guard = ftrace::async_enter!(
168 trace_id,
169 TRACE_CATEGORY,
170 c"ArchiveAccessorServer::spawn",
171 "data_type" => "Inspect",
172 "trace_id" => u64::from(trace_id)
173 );
174 if !matches!(mode, StreamMode::Snapshot) {
175 return Err(AccessorError::UnsupportedMode);
176 }
177 let stats = Arc::new(pipeline.accessor_stats().new_inspect_batch_iterator());
178
179 let selectors =
180 params.client_selector_configuration.ok_or(AccessorError::MissingSelectors)?;
181
182 let selectors = match selectors {
183 ClientSelectorConfiguration::Selectors(selectors) => {
184 Some(validate_and_parse_selectors(selectors)?)
185 }
186 ClientSelectorConfiguration::SelectAll(_) => None,
187 _ => return Err(AccessorError::InvalidSelectors("unrecognized selectors")),
188 };
189
190 let static_hierarchy_allowlist = pipeline.static_hierarchy_allowlist();
191 let unpopulated_container_vec =
192 inspect_repo.fetch_inspect_data(&selectors, static_hierarchy_allowlist);
193
194 let per_component_budget_opt = if unpopulated_container_vec.is_empty() {
195 None
196 } else {
197 performance_config
198 .aggregated_content_limit_bytes
199 .map(|limit| (limit as usize) / unpopulated_container_vec.len())
200 };
201
202 if let Some(max_snapshot_size) = performance_config.aggregated_content_limit_bytes {
203 stats.global_stats().record_max_snapshot_size_config(max_snapshot_size);
204 }
205 BatchIterator::new(
206 ReaderServer::stream(
207 unpopulated_container_vec,
208 performance_config,
209 selectors,
210 Arc::clone(&stats),
211 trace_id,
212 ),
213 requests,
214 mode,
215 stats,
216 per_component_budget_opt,
217 trace_id,
218 format,
219 )?
220 .run()
221 .await
222 }
223 DataType::Logs => {
224 if format == Format::Cbor {
225 return Err(AccessorError::UnsupportedFormat);
227 }
228 let _trace_guard = ftrace::async_enter!(
229 trace_id,
230 TRACE_CATEGORY,
231 c"ArchiveAccessorServer::spawn",
232 "data_type" => "Logs",
233 "trace_id" => u64::from(trace_id)
236 );
237 let stats = Arc::new(pipeline.accessor_stats().new_logs_batch_iterator());
238 let selectors = match params.client_selector_configuration {
239 Some(ClientSelectorConfiguration::Selectors(selectors)) => {
240 Some(validate_and_parse_log_selectors(selectors)?)
241 }
242 Some(ClientSelectorConfiguration::SelectAll(_)) => None,
243 _ => return Err(AccessorError::InvalidSelectors("unrecognized selectors")),
244 };
245 match format {
246 Format::Fxt => {
247 let logs = log_repo.logs_cursor_raw(mode, selectors, trace_id);
248 BatchIterator::new_serving_fxt(
249 logs,
250 requests,
251 mode,
252 stats,
253 trace_id,
254 performance_config,
255 )?
256 .run()
257 .await?;
258 Ok(())
259 }
260 Format::Json => {
261 let logs = log_repo
262 .logs_cursor(mode, selectors, trace_id)
263 .map(move |inner: _| (*inner).clone());
264 BatchIterator::new_serving_arrays(logs, requests, mode, stats, trace_id)?
265 .run()
266 .await?;
267 Ok(())
268 }
269 Format::Text => Err(AccessorError::UnsupportedFormat),
271 Format::Cbor => unreachable!("CBOR is not supported for logs"),
272 }
273 }
274 }
275 }
276
277 pub fn spawn_server<RequestStream>(&self, pipeline: Arc<Pipeline>, mut stream: RequestStream)
280 where
281 RequestStream: ArchiveAccessorTranslator + Send + 'static,
282 <RequestStream as ArchiveAccessorTranslator>::InnerDataRequestChannel:
283 ArchiveAccessorWriter + Send,
284 {
285 let log_repo = Arc::clone(&self.logs_repository);
288 let inspect_repo = Arc::clone(&self.inspect_repository);
289 let maximum_concurrent_snapshots_per_reader = self.maximum_concurrent_snapshots_per_reader;
290 let default_batch_timeout_seconds = self.default_batch_timeout_seconds;
291 let scope = self.scope.to_handle();
292 self.scope.spawn(async move {
293 let stats = pipeline.accessor_stats();
294 stats.global_stats.connections_opened.add(1);
295 while let Some(request) = stream.next().await {
296 let control_handle = request.iterator.get_control_handle();
297 stats.global_stats.stream_diagnostics_requests.add(1);
298 let pipeline = Arc::clone(&pipeline);
299
300 let inspect_repo_for_task = Arc::clone(&inspect_repo);
304 let log_repo_for_task = Arc::clone(&log_repo);
305 scope.spawn(async move {
306 if let Err(e) = Self::spawn(
307 pipeline,
308 inspect_repo_for_task,
309 log_repo_for_task,
310 request.iterator,
311 request.parameters,
312 maximum_concurrent_snapshots_per_reader,
313 default_batch_timeout_seconds,
314 )
315 .await
316 && let Some(control) = control_handle
317 {
318 e.close(control);
319 }
320 });
321 }
322 pipeline.accessor_stats().global_stats.connections_closed.add(1);
323 });
324 }
325}
326
327pub trait ArchiveAccessorWriter {
328 fn write(
330 &mut self,
331 results: Vec<FormattedContent>,
332 ) -> impl Future<Output = Result<(), IteratorError>> + Send;
333
334 fn wait_for_buffer(&mut self) -> impl Future<Output = anyhow::Result<()>> + Send {
336 futures::future::ready(Ok(()))
337 }
338
339 fn get_control_handle(&self) -> Option<BatchIteratorControlHandle> {
342 None
343 }
344
345 fn maybe_respond_ready(&mut self) -> impl Future<Output = Result<(), AccessorError>> + Send {
347 futures::future::ready(Ok(()))
348 }
349
350 fn wait_for_close(&mut self) -> impl Future<Output = ()> + Send;
352}
353
354fn get_buffer_from_formatted_content(
355 content: fidl_fuchsia_diagnostics::FormattedContent,
356) -> Result<Buffer, AccessorError> {
357 match content {
358 FormattedContent::Json(json) => Ok(json),
359 FormattedContent::Text(text) => Ok(text),
360 _ => Err(AccessorError::UnsupportedFormat),
361 }
362}
363
364impl ArchiveAccessorWriter for fuchsia_async::Socket {
365 async fn write(&mut self, data: Vec<FormattedContent>) -> Result<(), IteratorError> {
366 if data.is_empty() {
367 return Err(IteratorError::PeerClosed);
368 }
369 let mut buf = vec![0];
370 for value in data {
371 let data = get_buffer_from_formatted_content(value)?;
372 buf.resize(data.size as usize, 0);
373 data.vmo.read(&mut buf, 0)?;
374 let res = self.write_all(&buf).await;
375 if res.is_err() {
376 break;
378 }
379 }
380 Ok(())
381 }
382
383 async fn wait_for_close(&mut self) {
384 let _ = self.on_closed().await;
385 }
386}
387
388#[derive(Error, Debug)]
389pub enum IteratorError {
390 #[error("Peer closed")]
391 PeerClosed,
392 #[error(transparent)]
393 Ipc(#[from] fidl::Error),
394 #[error(transparent)]
395 AccessorError(#[from] AccessorError),
396 #[error("Error reading from VMO: {}", source)]
400 VmoReadError {
401 #[from]
402 source: zx::Status,
403 },
404}
405
406impl ArchiveAccessorWriter for Peekable<BatchIteratorRequestStream> {
407 async fn write(&mut self, data: Vec<FormattedContent>) -> Result<(), IteratorError> {
408 loop {
409 match self.next().await {
410 Some(Ok(BatchIteratorRequest::GetNext { responder })) => {
411 responder.send(Ok(data))?;
412 return Ok(());
413 }
414 Some(Ok(BatchIteratorRequest::WaitForReady { responder })) => {
415 responder.send()?;
416 }
417 Some(Ok(BatchIteratorRequest::_UnknownMethod { method_type, ordinal, .. })) => {
418 warn!(method_type:?, ordinal; "Got unknown interaction on BatchIterator");
419 return Err(IteratorError::PeerClosed);
420 }
421 Some(Err(err)) => return Err(err.into()),
422 None => {
423 return Err(IteratorError::PeerClosed);
424 }
425 }
426 }
427 }
428
429 async fn maybe_respond_ready(&mut self) -> Result<(), AccessorError> {
430 let mut this = Pin::new(self);
431 if matches!(this.as_mut().peek().await, Some(Ok(BatchIteratorRequest::WaitForReady { .. })))
432 {
433 let Some(Ok(BatchIteratorRequest::WaitForReady { responder })) = this.next().await
434 else {
435 unreachable!("We already checked the next request was WaitForReady");
436 };
437 responder.send()?;
438 }
439 Ok(())
440 }
441
442 async fn wait_for_buffer(&mut self) -> anyhow::Result<()> {
443 let this = Pin::new(self);
444 match this.peek().await {
445 Some(Ok(_)) => Ok(()),
446 _ => Err(IteratorError::PeerClosed.into()),
447 }
448 }
449
450 fn get_control_handle(&self) -> Option<BatchIteratorControlHandle> {
451 Some(self.get_ref().control_handle())
452 }
453
454 async fn wait_for_close(&mut self) {
455 let _ = self.get_ref().control_handle().on_closed().await;
456 }
457}
458
459pub struct ArchiveIteratorRequest<R> {
460 parameters: StreamParameters,
461 iterator: R,
462}
463
464pub trait ArchiveAccessorTranslator {
467 type InnerDataRequestChannel;
468 fn next(
469 &mut self,
470 ) -> impl Future<Output = Option<ArchiveIteratorRequest<Self::InnerDataRequestChannel>>> + Send;
471}
472
473impl ArchiveAccessorTranslator for fhost::ArchiveAccessorRequestStream {
474 type InnerDataRequestChannel = fuchsia_async::Socket;
475
476 async fn next(&mut self) -> Option<ArchiveIteratorRequest<Self::InnerDataRequestChannel>> {
477 match StreamExt::next(self).await {
478 Some(Ok(fhost::ArchiveAccessorRequest::StreamDiagnostics {
479 parameters,
480 responder,
481 stream,
482 })) => {
483 let _ = responder.send();
488 Some(ArchiveIteratorRequest {
489 iterator: fuchsia_async::Socket::from_socket(stream),
490 parameters,
491 })
492 }
493 _ => None,
494 }
495 }
496}
497
498impl ArchiveAccessorTranslator for ArchiveAccessorRequestStream {
499 type InnerDataRequestChannel = Peekable<BatchIteratorRequestStream>;
500
501 async fn next(&mut self) -> Option<ArchiveIteratorRequest<Self::InnerDataRequestChannel>> {
502 loop {
503 match StreamExt::next(self).await {
504 Some(Ok(ArchiveAccessorRequest::StreamDiagnostics {
505 control_handle: _,
506 result_stream,
507 stream_parameters,
508 })) => {
509 return Some(ArchiveIteratorRequest {
510 iterator: result_stream.into_stream().peekable(),
511 parameters: stream_parameters,
512 });
513 }
514 Some(Ok(ArchiveAccessorRequest::WaitForReady { responder })) => {
515 let _ = responder.send();
516 }
517 _ => return None,
518 }
519 }
520 }
521}
522struct SchemaTruncationCounter {
523 truncated_schemas: u64,
524 total_schemas: u64,
525}
526
527impl SchemaTruncationCounter {
528 pub fn new() -> Arc<Mutex<Self>> {
529 Arc::new(Mutex::new(Self { truncated_schemas: 0, total_schemas: 0 }))
530 }
531}
532
533pub(crate) struct BatchIterator<R> {
534 requests: R,
535 stats: Arc<BatchIteratorConnectionStats>,
536 data: FormattedStream,
537 truncation_counter: Option<Arc<Mutex<SchemaTruncationCounter>>>,
538 parent_trace_id: ftrace::Id,
539}
540
541fn maybe_update_budget(
544 budget_map: &mut HashMap<ExtendedMoniker, usize>,
545 moniker: &ExtendedMoniker,
546 bytes: usize,
547 byte_limit: usize,
548) -> bool {
549 if let Some(remaining_budget) = budget_map.get_mut(moniker) {
550 if *remaining_budget + bytes > byte_limit {
551 false
552 } else {
553 *remaining_budget += bytes;
554 true
555 }
556 } else if bytes > byte_limit {
557 budget_map.insert(moniker.clone(), 0);
558 false
559 } else {
560 budget_map.insert(moniker.clone(), bytes);
561 true
562 }
563}
564
565impl<R> BatchIterator<R>
566where
567 R: ArchiveAccessorWriter + Send,
568{
569 pub fn new<Items, D>(
570 data: Items,
571 requests: R,
572 mode: StreamMode,
573 stats: Arc<BatchIteratorConnectionStats>,
574 per_component_byte_limit_opt: Option<usize>,
575 parent_trace_id: ftrace::Id,
576 format: Format,
577 ) -> Result<Self, AccessorError>
578 where
579 Items: Stream<Item = Data<D>> + Send + 'static,
580 D: DiagnosticsData + 'static,
581 {
582 let result_stats_for_fut = Arc::clone(&stats);
583
584 let budget_tracker_shared = Arc::new(Mutex::new(HashMap::new()));
585
586 let truncation_counter = SchemaTruncationCounter::new();
587 let stream_owned_counter_for_fut = Arc::clone(&truncation_counter);
588
589 let data = data.then(move |mut d| {
590 let stream_owned_counter = Arc::clone(&stream_owned_counter_for_fut);
591 let result_stats = Arc::clone(&result_stats_for_fut);
592 let budget_tracker = Arc::clone(&budget_tracker_shared);
593 async move {
594 let trace_id = ftrace::Id::random();
595 let _trace_guard = ftrace::async_enter!(
596 trace_id,
597 TRACE_CATEGORY,
598 c"BatchIterator::new.serialize",
599 "parent_trace_id" => u64::from(parent_trace_id),
602 "trace_id" => u64::from(trace_id),
603 "moniker" => d.moniker.to_string().as_ref()
604 );
605 let mut unlocked_counter = stream_owned_counter.lock();
606 let mut tracker_guard = budget_tracker.lock();
607 unlocked_counter.total_schemas += 1;
608 if d.metadata.has_errors() {
609 result_stats.add_result_error();
610 }
611
612 match SerializedVmo::serialize(&d, D::DATA_TYPE, format) {
613 Err(e) => {
614 result_stats.add_result_error();
615 Err(e)
616 }
617 Ok(contents) => {
618 result_stats.add_result();
619 match per_component_byte_limit_opt {
620 Some(x) => {
621 if maybe_update_budget(
622 &mut tracker_guard,
623 &d.moniker,
624 contents.size as usize,
625 x,
626 ) {
627 Ok(contents)
628 } else {
629 result_stats.add_schema_truncated();
630 unlocked_counter.truncated_schemas += 1;
631 d.drop_payload();
632 SerializedVmo::serialize(&d, D::DATA_TYPE, format)
636 }
637 }
638 None => Ok(contents),
639 }
640 }
641 }
642 }
643 });
644
645 Self::new_inner(
646 new_batcher(data, Arc::clone(&stats), mode),
647 requests,
648 stats,
649 Some(truncation_counter),
650 parent_trace_id,
651 )
652 }
653
654 pub fn new_serving_fxt<S>(
655 data: S,
656 requests: R,
657 mode: StreamMode,
658 stats: Arc<BatchIteratorConnectionStats>,
659 parent_trace_id: ftrace::Id,
660 performance_config: PerformanceConfig,
661 ) -> Result<Self, AccessorError>
662 where
663 S: Stream<Item = CursorItem> + Send + Unpin + 'static,
664 {
665 let data = FXTPacketSerializer::new(
666 Arc::clone(&stats),
667 performance_config
668 .aggregated_content_limit_bytes
669 .unwrap_or(FORMATTED_CONTENT_CHUNK_SIZE_TARGET),
670 data,
671 );
672 Self::new_inner(
673 new_batcher(data, Arc::clone(&stats), mode),
674 requests,
675 stats,
676 None,
677 parent_trace_id,
678 )
679 }
680
681 pub fn new_serving_arrays<D, S>(
682 data: S,
683 requests: R,
684 mode: StreamMode,
685 stats: Arc<BatchIteratorConnectionStats>,
686 parent_trace_id: ftrace::Id,
687 ) -> Result<Self, AccessorError>
688 where
689 D: Serialize + Send + 'static,
690 S: Stream<Item = D> + Send + Unpin + 'static,
691 {
692 let data = JsonPacketSerializer::new(
693 Arc::clone(&stats),
694 FORMATTED_CONTENT_CHUNK_SIZE_TARGET,
695 data,
696 );
697 Self::new_inner(
698 new_batcher(data, Arc::clone(&stats), mode),
699 requests,
700 stats,
701 None,
702 parent_trace_id,
703 )
704 }
705
706 fn new_inner(
707 data: FormattedStream,
708 requests: R,
709 stats: Arc<BatchIteratorConnectionStats>,
710 truncation_counter: Option<Arc<Mutex<SchemaTruncationCounter>>>,
711 parent_trace_id: ftrace::Id,
712 ) -> Result<Self, AccessorError> {
713 stats.open_connection();
714 Ok(Self { data, requests, stats, truncation_counter, parent_trace_id })
715 }
716
717 pub async fn run(mut self) -> Result<(), AccessorError> {
718 self.requests.maybe_respond_ready().await?;
719 while self.requests.wait_for_buffer().await.is_ok() {
720 self.stats.add_request();
721 let start_time = zx::MonotonicInstant::get();
722 let trace_id = ftrace::Id::random();
723 let _trace_guard = ftrace::async_enter!(
724 trace_id,
725 TRACE_CATEGORY,
726 c"BatchIterator::run.get_send_batch",
727 "parent_trace_id" => u64::from(self.parent_trace_id),
730 "trace_id" => u64::from(trace_id)
731 );
732 let batch = {
733 let wait_for_close = self.requests.wait_for_close();
734 let next_data = self.data.next();
735 pin_mut!(wait_for_close);
736 match select(next_data, wait_for_close).await {
737 Either::Left((batch_option, _)) => batch_option.unwrap_or_default(),
739 Either::Right(_) => break,
741 }
742 };
743
744 let batch = batch.into_iter().collect::<Result<Vec<_>, _>>()?;
746
747 self.stats.add_response();
749 if batch.is_empty() {
750 if let Some(truncation_count) = &self.truncation_counter {
751 let unlocked_count = truncation_count.lock();
752 if unlocked_count.total_schemas > 0 {
753 self.stats.global_stats().record_percent_truncated_schemas(
754 ((unlocked_count.truncated_schemas as f32
755 / unlocked_count.total_schemas as f32)
756 * 100.0)
757 .round() as u64,
758 );
759 }
760 }
761 self.stats.add_terminal();
762 }
763 self.stats
764 .global_stats()
765 .record_batch_duration(zx::MonotonicInstant::get() - start_time);
766 if self.requests.write(batch).await.is_err() {
767 break;
769 }
770 }
771 Ok(())
772 }
773}
774
775impl<R> Drop for BatchIterator<R> {
776 fn drop(&mut self) {
777 self.stats.close_connection();
778 }
779}
780
781pub struct PerformanceConfig {
782 pub batch_timeout_sec: i64,
783 pub aggregated_content_limit_bytes: Option<u64>,
784 pub maximum_concurrent_snapshots_per_reader: u64,
785}
786
787impl PerformanceConfig {
788 pub fn new(
789 params: &StreamParameters,
790 maximum_concurrent_snapshots_per_reader: u64,
791 default_batch_timeout_seconds: BatchRetrievalTimeout,
792 ) -> Result<PerformanceConfig, AccessorError> {
793 let batch_timeout = match params {
794 StreamParameters {
797 batch_retrieval_timeout_seconds,
798 performance_configuration: None,
799 ..
800 }
801 | StreamParameters {
802 batch_retrieval_timeout_seconds,
803 performance_configuration:
804 Some(PerformanceConfiguration { batch_retrieval_timeout_seconds: None, .. }),
805 ..
806 } => batch_retrieval_timeout_seconds,
807 StreamParameters {
810 batch_retrieval_timeout_seconds: None,
811 performance_configuration:
812 Some(PerformanceConfiguration { batch_retrieval_timeout_seconds, .. }),
813 ..
814 } => batch_retrieval_timeout_seconds,
815 _ => return Err(AccessorError::DuplicateBatchTimeout),
817 }
818 .map(BatchRetrievalTimeout::from_seconds)
819 .unwrap_or(default_batch_timeout_seconds);
820
821 let aggregated_content_limit_bytes = match params {
822 StreamParameters {
823 performance_configuration:
824 Some(PerformanceConfiguration { max_aggregate_content_size_bytes, .. }),
825 ..
826 } => *max_aggregate_content_size_bytes,
827 _ => None,
828 };
829
830 Ok(PerformanceConfig {
831 batch_timeout_sec: batch_timeout.seconds(),
832 aggregated_content_limit_bytes,
833 maximum_concurrent_snapshots_per_reader,
834 })
835 }
836}
837
838#[cfg(test)]
839mod tests {
840 use super::*;
841 use crate::diagnostics::AccessorStats;
842 use crate::logs::shared_buffer::create_ring_buffer;
843 use assert_matches::assert_matches;
844 use fidl_fuchsia_diagnostics::{ArchiveAccessorMarker, BatchIteratorMarker};
845 use fuchsia_inspect::{Inspector, Node};
846 use zx::AsHandleRef;
847
848 #[fuchsia::test]
849 async fn logs_only_accept_basic_component_selectors() {
850 let scope = fasync::Scope::new();
851 let (accessor, stream) =
852 fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>();
853 let pipeline = Arc::new(Pipeline::for_test(None));
854 let inspector = Inspector::default();
855 let log_repo = LogsRepository::new(
856 create_ring_buffer(1_000_000),
857 std::iter::empty(),
858 inspector.root(),
859 scope.new_child(),
860 );
861 let inspect_repo =
862 Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], scope.new_child()));
863 let server = ArchiveAccessorServer::new(
864 inspect_repo,
865 log_repo,
866 4,
867 BatchRetrievalTimeout::max(),
868 scope.new_child(),
869 );
870 server.spawn_server(pipeline, stream);
871
872 let (batch_iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
874 assert!(
875 accessor
876 .r#stream_diagnostics(
877 &StreamParameters {
878 data_type: Some(DataType::Logs),
879 stream_mode: Some(StreamMode::SnapshotThenSubscribe),
880 format: Some(Format::Json),
881 client_selector_configuration: Some(
882 ClientSelectorConfiguration::Selectors(vec![
883 SelectorArgument::RawSelector("foo:root/bar:baz".to_string()),
884 ])
885 ),
886 ..Default::default()
887 },
888 server_end
889 )
890 .is_ok()
891 );
892 assert_matches!(
893 batch_iterator.get_next().await,
894 Err(fidl::Error::ClientChannelClosed { status: zx_status::Status::INVALID_ARGS, .. })
895 );
896
897 let (batch_iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
899 assert!(
900 accessor
901 .r#stream_diagnostics(
902 &StreamParameters {
903 data_type: Some(DataType::Logs),
904 stream_mode: Some(StreamMode::Snapshot),
905 format: Some(Format::Json),
906 client_selector_configuration: Some(
907 ClientSelectorConfiguration::Selectors(vec![
908 SelectorArgument::RawSelector("foo:root".to_string()),
909 ])
910 ),
911 ..Default::default()
912 },
913 server_end
914 )
915 .is_ok()
916 );
917
918 assert!(batch_iterator.get_next().await.is_ok());
919 }
920
921 #[fuchsia::test]
922 async fn accessor_skips_invalid_selectors() {
923 let scope = fasync::Scope::new();
924 let (accessor, stream) =
925 fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>();
926 let pipeline = Arc::new(Pipeline::for_test(None));
927 let inspector = Inspector::default();
928 let log_repo = LogsRepository::new(
929 create_ring_buffer(1_000_000),
930 std::iter::empty(),
931 inspector.root(),
932 scope.new_child(),
933 );
934 let inspect_repo =
935 Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], scope.new_child()));
936 let server = Arc::new(ArchiveAccessorServer::new(
937 inspect_repo,
938 log_repo,
939 4,
940 BatchRetrievalTimeout::max(),
941 scope.new_child(),
942 ));
943 server.spawn_server(pipeline, stream);
944
945 let (batch_iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
947
948 assert!(
949 accessor
950 .r#stream_diagnostics(
951 &StreamParameters {
952 data_type: Some(DataType::Inspect),
953 stream_mode: Some(StreamMode::Snapshot),
954 format: Some(Format::Json),
955 client_selector_configuration: Some(
956 ClientSelectorConfiguration::Selectors(vec![
957 SelectorArgument::RawSelector("invalid".to_string()),
958 SelectorArgument::RawSelector("valid:root".to_string()),
959 ])
960 ),
961 ..Default::default()
962 },
963 server_end
964 )
965 .is_ok()
966 );
967
968 assert!(batch_iterator.get_next().await.is_ok());
971 }
972
973 #[fuchsia::test]
974 async fn buffered_iterator_handles_two_consecutive_buffer_waits() {
975 let (client, server) = fidl::endpoints::create_proxy_and_stream::<BatchIteratorMarker>();
976 let _fut = client.get_next();
977 let mut server = server.peekable();
978 assert_matches!(server.wait_for_buffer().await, Ok(()));
979 assert_matches!(server.wait_for_buffer().await, Ok(()));
980 }
981
982 #[fuchsia::test]
983 async fn buffered_iterator_handles_peer_closed() {
984 let (client, server) = fidl::endpoints::create_proxy_and_stream::<BatchIteratorMarker>();
985 let mut server = server.peekable();
986 drop(client);
987 assert_matches!(
988 server
989 .write(vec![FormattedContent::Text(Buffer {
990 size: 1,
991 vmo: zx::Vmo::create(1).unwrap(),
992 })])
993 .await,
994 Err(IteratorError::PeerClosed)
995 );
996 }
997
998 #[fuchsia::test]
999 fn socket_writer_handles_text() {
1000 let vmo = zx::Vmo::create(1).unwrap();
1001 vmo.write(&[5u8], 0).unwrap();
1002 let koid = vmo.get_koid().unwrap();
1003 let text = FormattedContent::Text(Buffer { size: 1, vmo });
1004 let result = get_buffer_from_formatted_content(text).unwrap();
1005 assert_eq!(result.size, 1);
1006 assert_eq!(result.vmo.get_koid().unwrap(), koid);
1007 let mut buffer = [0];
1008 result.vmo.read(&mut buffer, 0).unwrap();
1009 assert_eq!(buffer[0], 5);
1010 }
1011
1012 #[fuchsia::test]
1013 fn socket_writer_does_not_handle_cbor() {
1014 let vmo = zx::Vmo::create(1).unwrap();
1015 vmo.write(&[5u8], 0).unwrap();
1016 let text = FormattedContent::Cbor(vmo);
1017 let result = get_buffer_from_formatted_content(text);
1018 assert_matches!(result, Err(AccessorError::UnsupportedFormat));
1019 }
1020
1021 #[fuchsia::test]
1022 async fn socket_writer_handles_closed_socket() {
1023 let (local, remote) = zx::Socket::create_stream();
1024 drop(local);
1025 let mut remote = fuchsia_async::Socket::from_socket(remote);
1026 {
1027 let result = ArchiveAccessorWriter::write(
1028 &mut remote,
1029 vec![FormattedContent::Text(Buffer { size: 1, vmo: zx::Vmo::create(1).unwrap() })],
1030 )
1031 .await;
1032 assert_matches!(result, Ok(()));
1033 }
1034 remote.wait_for_close().await;
1035 }
1036
1037 #[fuchsia::test]
1038 fn batch_iterator_terminates_on_client_disconnect() {
1039 let mut executor = fasync::TestExecutor::new();
1040 let (batch_iterator_proxy, stream) =
1041 fidl::endpoints::create_proxy_and_stream::<BatchIteratorMarker>();
1042 let batch_iterator = BatchIterator::new(
1044 futures::stream::pending::<diagnostics_data::Data<diagnostics_data::Logs>>(),
1045 stream.peekable(),
1046 StreamMode::Subscribe,
1047 Arc::new(AccessorStats::new(Node::default()).new_inspect_batch_iterator()),
1048 None,
1049 ftrace::Id::random(),
1050 Format::Json,
1051 )
1052 .expect("create batch iterator");
1053
1054 let mut batch_iterator_fut = batch_iterator.run().boxed();
1055 assert!(executor.run_until_stalled(&mut batch_iterator_fut).is_pending());
1056
1057 let mut iterator_request_fut = batch_iterator_proxy.get_next();
1059 assert!(executor.run_until_stalled(&mut iterator_request_fut).is_pending());
1060 assert!(executor.run_until_stalled(&mut batch_iterator_fut).is_pending());
1061 assert!(executor.run_until_stalled(&mut iterator_request_fut).is_pending());
1062
1063 drop(iterator_request_fut);
1066 drop(batch_iterator_proxy);
1067 assert_matches!(executor.run_singlethreaded(&mut batch_iterator_fut), Ok(()));
1068 }
1069
1070 #[fuchsia::test]
1071 async fn batch_iterator_on_ready_is_called() {
1072 let scope = fasync::Scope::new();
1073 let (accessor, stream) =
1074 fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>();
1075 let pipeline = Arc::new(Pipeline::for_test(None));
1076 let inspector = Inspector::default();
1077 let log_repo = LogsRepository::new(
1078 create_ring_buffer(1_000_000),
1079 std::iter::empty(),
1080 inspector.root(),
1081 scope.new_child(),
1082 );
1083 let inspect_repo =
1084 Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], scope.new_child()));
1085 let server = ArchiveAccessorServer::new(
1086 inspect_repo,
1087 log_repo,
1088 4,
1089 BatchRetrievalTimeout::max(),
1090 scope.new_child(),
1091 );
1092 server.spawn_server(pipeline, stream);
1093
1094 let (batch_iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
1096 assert!(
1097 accessor
1098 .r#stream_diagnostics(
1099 &StreamParameters {
1100 data_type: Some(DataType::Logs),
1101 stream_mode: Some(StreamMode::Subscribe),
1102 format: Some(Format::Json),
1103 client_selector_configuration: Some(
1104 ClientSelectorConfiguration::SelectAll(true)
1105 ),
1106 ..Default::default()
1107 },
1108 server_end
1109 )
1110 .is_ok()
1111 );
1112
1113 assert!(batch_iterator.wait_for_ready().await.is_ok());
1115 }
1116}