1use crate::constants::FORMATTED_CONTENT_CHUNK_SIZE_TARGET;
6use crate::diagnostics::{BatchIteratorConnectionStats, TRACE_CATEGORY};
7use crate::error::AccessorError;
8use crate::formatter::{
9 FormattedStream, FxtPacketSerializer, JsonPacketSerializer, SerializedVmo, new_batcher,
10};
11use crate::inspect::ReaderServer;
12use crate::inspect::repository::InspectRepository;
13use crate::logs::repository::LogsRepository;
14use crate::logs::shared_buffer::FxtMessage;
15use crate::pipeline::Pipeline;
16use diagnostics_data::{Data, DiagnosticsData, ExtendedMoniker, Metadata};
17use fidl::endpoints::{ControlHandle, RequestStream};
18use fidl_fuchsia_diagnostics::{
19 ArchiveAccessorRequest, ArchiveAccessorRequestStream, BatchIteratorControlHandle,
20 BatchIteratorRequest, BatchIteratorRequestStream, ClientSelectorConfiguration,
21 ComponentSelector, DataType, Format, FormattedContent, PerformanceConfiguration, Selector,
22 SelectorArgument, StreamMode, StreamParameters, StringSelector, TreeSelector,
23 TreeSelectorUnknown,
24};
25use fidl_fuchsia_mem::Buffer;
26use fuchsia_inspect::NumericProperty;
27use fuchsia_sync::Mutex;
28use futures::StreamExt;
29use futures::future::{Either, select};
30use futures::prelude::*;
31use futures::stream::Peekable;
32use log::warn;
33use selectors::FastError;
34use serde::Serialize;
35use std::collections::HashMap;
36use std::pin::{Pin, pin};
37use std::sync::Arc;
38use thiserror::Error;
39use {fidl_fuchsia_diagnostics_host as fhost, fuchsia_async as fasync, fuchsia_trace as ftrace};
40
41#[derive(Debug, Copy, Clone)]
42pub struct BatchRetrievalTimeout(i64);
43
44impl BatchRetrievalTimeout {
45 pub fn from_seconds(s: i64) -> Self {
46 Self(s)
47 }
48
49 #[cfg(test)]
50 pub fn max() -> Self {
51 Self::from_seconds(-1)
52 }
53
54 pub fn seconds(&self) -> i64 {
55 if self.0 > 0 { self.0 } else { i64::MAX }
56 }
57}
58
59pub struct ArchiveAccessorServer {
63 inspect_repository: Arc<InspectRepository>,
64 logs_repository: Arc<LogsRepository>,
65 maximum_concurrent_snapshots_per_reader: u64,
66 scope: fasync::Scope,
67 default_batch_timeout_seconds: BatchRetrievalTimeout,
68}
69
70fn validate_and_parse_selectors(
71 selector_args: Vec<SelectorArgument>,
72) -> Result<Vec<Selector>, AccessorError> {
73 let mut selectors = vec![];
74 let mut errors = vec![];
75
76 if selector_args.is_empty() {
77 return Err(AccessorError::EmptySelectors);
78 }
79
80 for selector_arg in selector_args {
81 match selectors::take_from_argument::<FastError>(selector_arg) {
82 Ok(s) => selectors.push(s),
83 Err(e) => errors.push(e),
84 }
85 }
86
87 if !errors.is_empty() {
88 warn!(errors:?; "Found errors in selector arguments");
89 }
90
91 Ok(selectors)
92}
93
94fn validate_and_parse_log_selectors(
95 selector_args: Vec<SelectorArgument>,
96) -> Result<Vec<ComponentSelector>, AccessorError> {
97 let selectors = validate_and_parse_selectors(selector_args)?;
99 let mut component_selectors = Vec::with_capacity(selectors.len());
100 for selector in selectors {
101 let tree_selector = selector.tree_selector.as_ref().unwrap();
103 match tree_selector {
104 TreeSelector::PropertySelector(_) => {
105 return Err(AccessorError::InvalidLogSelector);
106 }
107 TreeSelector::SubtreeSelector(subtree_selector) => {
108 if subtree_selector.node_path.len() != 1 {
109 return Err(AccessorError::InvalidLogSelector);
110 }
111 match &subtree_selector.node_path[0] {
112 StringSelector::ExactMatch(val) if val == "root" => {}
113 StringSelector::StringPattern(val) if val == "root" => {}
114 _ => {
115 return Err(AccessorError::InvalidLogSelector);
116 }
117 }
118 }
119 TreeSelectorUnknown!() => {}
120 }
121 component_selectors.push(selector.component_selector.unwrap());
122 }
123 Ok(component_selectors)
124}
125
126impl ArchiveAccessorServer {
127 pub fn new(
131 inspect_repository: Arc<InspectRepository>,
132 logs_repository: Arc<LogsRepository>,
133 maximum_concurrent_snapshots_per_reader: u64,
134 default_batch_timeout_seconds: BatchRetrievalTimeout,
135 scope: fasync::Scope,
136 ) -> Self {
137 ArchiveAccessorServer {
138 inspect_repository,
139 logs_repository,
140 maximum_concurrent_snapshots_per_reader,
141 scope,
142 default_batch_timeout_seconds,
143 }
144 }
145
146 async fn spawn<R: ArchiveAccessorWriter + Send>(
147 pipeline: Arc<Pipeline>,
148 inspect_repo: Arc<InspectRepository>,
149 log_repo: Arc<LogsRepository>,
150 requests: R,
151 params: StreamParameters,
152 maximum_concurrent_snapshots_per_reader: u64,
153 default_batch_timeout_seconds: BatchRetrievalTimeout,
154 ) -> Result<(), AccessorError> {
155 let format = params.format.ok_or(AccessorError::MissingFormat)?;
156 if !matches!(format, Format::Json | Format::Cbor | Format::Fxt) {
157 return Err(AccessorError::UnsupportedFormat);
158 }
159 let mode = params.stream_mode.ok_or(AccessorError::MissingMode)?;
160
161 let performance_config: PerformanceConfig = PerformanceConfig::new(
162 ¶ms,
163 maximum_concurrent_snapshots_per_reader,
164 default_batch_timeout_seconds,
165 )?;
166
167 let trace_id = ftrace::Id::random();
168 match params.data_type.ok_or(AccessorError::MissingDataType)? {
169 DataType::Inspect => {
170 let _trace_guard = ftrace::async_enter!(
171 trace_id,
172 TRACE_CATEGORY,
173 c"ArchiveAccessorServer::spawn",
174 "data_type" => "Inspect",
175 "trace_id" => u64::from(trace_id)
176 );
177 if !matches!(mode, StreamMode::Snapshot) {
178 return Err(AccessorError::UnsupportedMode);
179 }
180 let stats = Arc::new(pipeline.accessor_stats().new_inspect_batch_iterator());
181
182 let selectors =
183 params.client_selector_configuration.ok_or(AccessorError::MissingSelectors)?;
184
185 let selectors = match selectors {
186 ClientSelectorConfiguration::Selectors(selectors) => {
187 Some(validate_and_parse_selectors(selectors)?)
188 }
189 ClientSelectorConfiguration::SelectAll(_) => None,
190 _ => return Err(AccessorError::InvalidSelectors("unrecognized selectors")),
191 };
192
193 let static_hierarchy_allowlist = pipeline.static_hierarchy_allowlist();
194 let unpopulated_container_vec =
195 inspect_repo.fetch_inspect_data(&selectors, static_hierarchy_allowlist);
196
197 let per_component_budget_opt = if unpopulated_container_vec.is_empty() {
198 None
199 } else {
200 performance_config
201 .aggregated_content_limit_bytes
202 .map(|limit| (limit as usize) / unpopulated_container_vec.len())
203 };
204
205 if let Some(max_snapshot_size) = performance_config.aggregated_content_limit_bytes {
206 stats.global_stats().record_max_snapshot_size_config(max_snapshot_size);
207 }
208 BatchIterator::new(
209 ReaderServer::stream(
210 unpopulated_container_vec,
211 performance_config,
212 selectors,
213 Arc::clone(&stats),
214 trace_id,
215 ),
216 requests,
217 mode,
218 stats,
219 per_component_budget_opt,
220 trace_id,
221 format,
222 )?
223 .run()
224 .await
225 }
226 DataType::Logs => {
227 if format == Format::Cbor {
228 return Err(AccessorError::UnsupportedFormat);
230 }
231 let _trace_guard = ftrace::async_enter!(
232 trace_id,
233 TRACE_CATEGORY,
234 c"ArchiveAccessorServer::spawn",
235 "data_type" => "Logs",
236 "trace_id" => u64::from(trace_id)
239 );
240 let stats = Arc::new(pipeline.accessor_stats().new_logs_batch_iterator());
241 let selectors = match params.client_selector_configuration {
242 Some(ClientSelectorConfiguration::Selectors(selectors)) => {
243 validate_and_parse_log_selectors(selectors)?
244 }
245 Some(ClientSelectorConfiguration::SelectAll(_)) => Vec::new(),
246 _ => return Err(AccessorError::InvalidSelectors("unrecognized selectors")),
247 };
248 match format {
249 Format::Fxt => {
250 let logs = Box::pin(log_repo.logs_cursor_raw(mode, selectors));
251 BatchIterator::new_serving_fxt(
252 logs,
253 requests,
254 mode,
255 stats,
256 trace_id,
257 performance_config,
258 )?
259 .run()
260 .await?;
261 Ok(())
262 }
263 Format::Json => {
264 let logs = Box::pin(log_repo.logs_cursor(mode, selectors));
265 BatchIterator::new_serving_arrays(logs, requests, mode, stats, trace_id)?
266 .run()
267 .await?;
268 Ok(())
269 }
270 Format::Text => Err(AccessorError::UnsupportedFormat),
272 Format::Cbor => unreachable!("CBOR is not supported for logs"),
273 }
274 }
275 }
276 }
277
278 pub fn spawn_server<RequestStream>(&self, pipeline: Arc<Pipeline>, mut stream: RequestStream)
281 where
282 RequestStream: ArchiveAccessorTranslator + Send + 'static,
283 <RequestStream as ArchiveAccessorTranslator>::InnerDataRequestChannel:
284 ArchiveAccessorWriter + Send,
285 {
286 let log_repo = Arc::clone(&self.logs_repository);
289 let inspect_repo = Arc::clone(&self.inspect_repository);
290 let maximum_concurrent_snapshots_per_reader = self.maximum_concurrent_snapshots_per_reader;
291 let default_batch_timeout_seconds = self.default_batch_timeout_seconds;
292 let scope = self.scope.to_handle();
293 self.scope.spawn(async move {
294 let stats = pipeline.accessor_stats();
295 stats.global_stats.connections_opened.add(1);
296 while let Some(request) = stream.next().await {
297 let control_handle = request.iterator.get_control_handle();
298 stats.global_stats.stream_diagnostics_requests.add(1);
299 let pipeline = Arc::clone(&pipeline);
300
301 let inspect_repo_for_task = Arc::clone(&inspect_repo);
305 let log_repo_for_task = Arc::clone(&log_repo);
306 scope.spawn(async move {
307 if let Err(e) = Self::spawn(
308 pipeline,
309 inspect_repo_for_task,
310 log_repo_for_task,
311 request.iterator,
312 request.parameters,
313 maximum_concurrent_snapshots_per_reader,
314 default_batch_timeout_seconds,
315 )
316 .await
317 && let Some(control) = control_handle
318 {
319 e.close(control);
320 }
321 });
322 }
323 pipeline.accessor_stats().global_stats.connections_closed.add(1);
324 });
325 }
326}
327
328pub trait ArchiveAccessorWriter {
329 fn write(
331 &mut self,
332 results: Vec<FormattedContent>,
333 ) -> impl Future<Output = Result<(), IteratorError>> + Send;
334
335 fn wait_for_buffer(&mut self) -> impl Future<Output = anyhow::Result<()>> + Send {
337 futures::future::ready(Ok(()))
338 }
339
340 fn get_control_handle(&self) -> Option<BatchIteratorControlHandle> {
343 None
344 }
345
346 fn maybe_respond_ready(&mut self) -> impl Future<Output = Result<(), AccessorError>> + Send {
348 futures::future::ready(Ok(()))
349 }
350
351 fn wait_for_close(&mut self) -> impl Future<Output = ()> + Send;
353}
354
355fn get_buffer_from_formatted_content(
356 content: fidl_fuchsia_diagnostics::FormattedContent,
357) -> Result<Buffer, AccessorError> {
358 match content {
359 FormattedContent::Json(json) => Ok(json),
360 _ => Err(AccessorError::UnsupportedFormat),
361 }
362}
363
364impl ArchiveAccessorWriter for fuchsia_async::Socket {
365 async fn write(&mut self, data: Vec<FormattedContent>) -> Result<(), IteratorError> {
366 if data.is_empty() {
367 return Err(IteratorError::PeerClosed);
368 }
369 let mut buf = vec![0];
370 for value in data {
371 let data = get_buffer_from_formatted_content(value)?;
372 buf.resize(data.size as usize, 0);
373 data.vmo.read(&mut buf, 0)?;
374 let res = self.write_all(&buf).await;
375 if res.is_err() {
376 break;
378 }
379 }
380 Ok(())
381 }
382
383 async fn wait_for_close(&mut self) {
384 let _ = self.on_closed().await;
385 }
386}
387
388#[derive(Error, Debug)]
389pub enum IteratorError {
390 #[error("Peer closed")]
391 PeerClosed,
392 #[error(transparent)]
393 Ipc(#[from] fidl::Error),
394 #[error(transparent)]
395 AccessorError(#[from] AccessorError),
396 #[error("Error reading from VMO: {}", source)]
400 VmoReadError {
401 #[from]
402 source: zx::Status,
403 },
404}
405
406impl ArchiveAccessorWriter for Peekable<BatchIteratorRequestStream> {
407 async fn write(&mut self, data: Vec<FormattedContent>) -> Result<(), IteratorError> {
408 loop {
409 match self.next().await {
410 Some(Ok(BatchIteratorRequest::GetNext { responder })) => {
411 responder.send(Ok(data))?;
412 return Ok(());
413 }
414 Some(Ok(BatchIteratorRequest::WaitForReady { responder })) => {
415 responder.send()?;
416 }
417 Some(Ok(BatchIteratorRequest::_UnknownMethod { method_type, ordinal, .. })) => {
418 warn!(method_type:?, ordinal; "Got unknown interaction on BatchIterator");
419 return Err(IteratorError::PeerClosed);
420 }
421 Some(Err(err)) => return Err(err.into()),
422 None => {
423 return Err(IteratorError::PeerClosed);
424 }
425 }
426 }
427 }
428
429 async fn maybe_respond_ready(&mut self) -> Result<(), AccessorError> {
430 let mut this = Pin::new(self);
431 if matches!(this.as_mut().peek().await, Some(Ok(BatchIteratorRequest::WaitForReady { .. })))
432 {
433 let Some(Ok(BatchIteratorRequest::WaitForReady { responder })) = this.next().await
434 else {
435 unreachable!("We already checked the next request was WaitForReady");
436 };
437 responder.send()?;
438 }
439 Ok(())
440 }
441
442 async fn wait_for_buffer(&mut self) -> anyhow::Result<()> {
443 let this = Pin::new(self);
444 match this.peek().await {
445 Some(Ok(_)) => Ok(()),
446 _ => Err(IteratorError::PeerClosed.into()),
447 }
448 }
449
450 fn get_control_handle(&self) -> Option<BatchIteratorControlHandle> {
451 Some(self.get_ref().control_handle())
452 }
453
454 async fn wait_for_close(&mut self) {
455 let _ = self.get_ref().control_handle().on_closed().await;
456 }
457}
458
459pub struct ArchiveIteratorRequest<R> {
460 parameters: StreamParameters,
461 iterator: R,
462}
463
464pub trait ArchiveAccessorTranslator {
467 type InnerDataRequestChannel;
468 fn next(
469 &mut self,
470 ) -> impl Future<Output = Option<ArchiveIteratorRequest<Self::InnerDataRequestChannel>>> + Send;
471}
472
473impl ArchiveAccessorTranslator for fhost::ArchiveAccessorRequestStream {
474 type InnerDataRequestChannel = fuchsia_async::Socket;
475
476 async fn next(&mut self) -> Option<ArchiveIteratorRequest<Self::InnerDataRequestChannel>> {
477 match StreamExt::next(self).await {
478 Some(Ok(fhost::ArchiveAccessorRequest::StreamDiagnostics {
479 parameters,
480 responder,
481 stream,
482 })) => {
483 let _ = responder.send();
488 Some(ArchiveIteratorRequest {
489 iterator: fuchsia_async::Socket::from_socket(stream),
490 parameters,
491 })
492 }
493 _ => None,
494 }
495 }
496}
497
498impl ArchiveAccessorTranslator for ArchiveAccessorRequestStream {
499 type InnerDataRequestChannel = Peekable<BatchIteratorRequestStream>;
500
501 async fn next(&mut self) -> Option<ArchiveIteratorRequest<Self::InnerDataRequestChannel>> {
502 loop {
503 match StreamExt::next(self).await {
504 Some(Ok(ArchiveAccessorRequest::StreamDiagnostics {
505 control_handle: _,
506 result_stream,
507 stream_parameters,
508 })) => {
509 return Some(ArchiveIteratorRequest {
510 iterator: result_stream.into_stream().peekable(),
511 parameters: stream_parameters,
512 });
513 }
514 Some(Ok(ArchiveAccessorRequest::WaitForReady { responder })) => {
515 let _ = responder.send();
516 }
517 _ => return None,
518 }
519 }
520 }
521}
522struct SchemaTruncationCounter {
523 truncated_schemas: u64,
524 total_schemas: u64,
525}
526
527impl SchemaTruncationCounter {
528 pub fn new() -> Arc<Mutex<Self>> {
529 Arc::new(Mutex::new(Self { truncated_schemas: 0, total_schemas: 0 }))
530 }
531}
532
533pub(crate) struct BatchIterator<R> {
534 requests: R,
535 stats: Arc<BatchIteratorConnectionStats>,
536 data: FormattedStream,
537 truncation_counter: Option<Arc<Mutex<SchemaTruncationCounter>>>,
538 parent_trace_id: ftrace::Id,
539}
540
541fn maybe_update_budget(
544 budget_map: &mut HashMap<ExtendedMoniker, usize>,
545 moniker: &ExtendedMoniker,
546 bytes: usize,
547 byte_limit: usize,
548) -> bool {
549 if let Some(remaining_budget) = budget_map.get_mut(moniker) {
550 if *remaining_budget + bytes > byte_limit {
551 false
552 } else {
553 *remaining_budget += bytes;
554 true
555 }
556 } else if bytes > byte_limit {
557 budget_map.insert(moniker.clone(), 0);
558 false
559 } else {
560 budget_map.insert(moniker.clone(), bytes);
561 true
562 }
563}
564
565impl<R> BatchIterator<R>
566where
567 R: ArchiveAccessorWriter + Send,
568{
569 pub fn new<Items, D>(
570 data: Items,
571 requests: R,
572 mode: StreamMode,
573 stats: Arc<BatchIteratorConnectionStats>,
574 per_component_byte_limit_opt: Option<usize>,
575 parent_trace_id: ftrace::Id,
576 format: Format,
577 ) -> Result<Self, AccessorError>
578 where
579 Items: Stream<Item = Data<D>> + Send + 'static,
580 D: DiagnosticsData + 'static,
581 {
582 let result_stats_for_fut = Arc::clone(&stats);
583
584 let budget_tracker_shared = Arc::new(Mutex::new(HashMap::new()));
585
586 let truncation_counter = SchemaTruncationCounter::new();
587 let stream_owned_counter_for_fut = Arc::clone(&truncation_counter);
588
589 let data = data.then(move |mut d| {
590 let stream_owned_counter = Arc::clone(&stream_owned_counter_for_fut);
591 let result_stats = Arc::clone(&result_stats_for_fut);
592 let budget_tracker = Arc::clone(&budget_tracker_shared);
593 async move {
594 let trace_id = ftrace::Id::random();
595 let _trace_guard = ftrace::async_enter!(
596 trace_id,
597 TRACE_CATEGORY,
598 c"BatchIterator::new.serialize",
599 "parent_trace_id" => u64::from(parent_trace_id),
602 "trace_id" => u64::from(trace_id),
603 "moniker" => d.moniker.to_string().as_ref()
604 );
605 let mut unlocked_counter = stream_owned_counter.lock();
606 let mut tracker_guard = budget_tracker.lock();
607 unlocked_counter.total_schemas += 1;
608 if d.metadata.has_errors() {
609 result_stats.add_result_error();
610 }
611
612 match SerializedVmo::serialize(&d, D::DATA_TYPE, format) {
613 Err(e) => {
614 result_stats.add_result_error();
615 Err(e)
616 }
617 Ok(contents) => {
618 result_stats.add_result();
619 match per_component_byte_limit_opt {
620 Some(x) => {
621 if maybe_update_budget(
622 &mut tracker_guard,
623 &d.moniker,
624 contents.size as usize,
625 x,
626 ) {
627 Ok(contents)
628 } else {
629 result_stats.add_schema_truncated();
630 unlocked_counter.truncated_schemas += 1;
631 d.drop_payload();
632 SerializedVmo::serialize(&d, D::DATA_TYPE, format)
636 }
637 }
638 None => Ok(contents),
639 }
640 }
641 }
642 }
643 });
644
645 Self::new_inner(
646 new_batcher(data, Arc::clone(&stats), mode),
647 requests,
648 stats,
649 Some(truncation_counter),
650 parent_trace_id,
651 )
652 }
653
654 pub fn new_serving_fxt<S>(
655 data: S,
656 requests: R,
657 mode: StreamMode,
658 stats: Arc<BatchIteratorConnectionStats>,
659 parent_trace_id: ftrace::Id,
660 performance_config: PerformanceConfig,
661 ) -> Result<Self, AccessorError>
662 where
663 S: Stream<Item = FxtMessage> + Send + Unpin + 'static,
664 {
665 let data = FxtPacketSerializer::new(
666 Arc::clone(&stats),
667 performance_config
668 .aggregated_content_limit_bytes
669 .unwrap_or(FORMATTED_CONTENT_CHUNK_SIZE_TARGET),
670 data,
671 );
672 Self::new_inner(
673 new_batcher(data, Arc::clone(&stats), mode),
674 requests,
675 stats,
676 None,
677 parent_trace_id,
678 )
679 }
680
681 pub fn new_serving_arrays<D, S>(
682 data: S,
683 requests: R,
684 mode: StreamMode,
685 stats: Arc<BatchIteratorConnectionStats>,
686 parent_trace_id: ftrace::Id,
687 ) -> Result<Self, AccessorError>
688 where
689 D: Serialize + Send + 'static,
690 S: Stream<Item = D> + Send + Unpin + 'static,
691 {
692 let data = JsonPacketSerializer::new(
693 Arc::clone(&stats),
694 FORMATTED_CONTENT_CHUNK_SIZE_TARGET,
695 data,
696 );
697 Self::new_inner(
698 new_batcher(data, Arc::clone(&stats), mode),
699 requests,
700 stats,
701 None,
702 parent_trace_id,
703 )
704 }
705
706 fn new_inner(
707 data: FormattedStream,
708 requests: R,
709 stats: Arc<BatchIteratorConnectionStats>,
710 truncation_counter: Option<Arc<Mutex<SchemaTruncationCounter>>>,
711 parent_trace_id: ftrace::Id,
712 ) -> Result<Self, AccessorError> {
713 stats.open_connection();
714 Ok(Self { data, requests, stats, truncation_counter, parent_trace_id })
715 }
716
717 pub async fn run(mut self) -> Result<(), AccessorError> {
718 self.requests.maybe_respond_ready().await?;
719 while self.requests.wait_for_buffer().await.is_ok() {
720 self.stats.add_request();
721 let start_time = zx::MonotonicInstant::get();
722 let trace_id = ftrace::Id::random();
723 let _trace_guard = ftrace::async_enter!(
724 trace_id,
725 TRACE_CATEGORY,
726 c"BatchIterator::run.get_send_batch",
727 "parent_trace_id" => u64::from(self.parent_trace_id),
730 "trace_id" => u64::from(trace_id)
731 );
732 let batch = {
733 let wait_for_close = pin!(self.requests.wait_for_close());
734 let next_data = self.data.next();
735 match select(next_data, wait_for_close).await {
736 Either::Left((batch_option, _)) => batch_option.unwrap_or_default(),
738 Either::Right(_) => break,
740 }
741 };
742
743 let batch = batch.into_iter().collect::<Result<Vec<_>, _>>()?;
745
746 self.stats.add_response();
748 if batch.is_empty() {
749 if let Some(truncation_count) = &self.truncation_counter {
750 let unlocked_count = truncation_count.lock();
751 if unlocked_count.total_schemas > 0 {
752 self.stats.global_stats().record_percent_truncated_schemas(
753 ((unlocked_count.truncated_schemas as f32
754 / unlocked_count.total_schemas as f32)
755 * 100.0)
756 .round() as u64,
757 );
758 }
759 }
760 self.stats.add_terminal();
761 }
762 self.stats
763 .global_stats()
764 .record_batch_duration(zx::MonotonicInstant::get() - start_time);
765 if self.requests.write(batch).await.is_err() {
766 break;
768 }
769 }
770 Ok(())
771 }
772}
773
774impl<R> Drop for BatchIterator<R> {
775 fn drop(&mut self) {
776 self.stats.close_connection();
777 }
778}
779
780pub struct PerformanceConfig {
781 pub batch_timeout_sec: i64,
782 pub aggregated_content_limit_bytes: Option<u64>,
783 pub maximum_concurrent_snapshots_per_reader: u64,
784}
785
786impl PerformanceConfig {
787 pub fn new(
788 params: &StreamParameters,
789 maximum_concurrent_snapshots_per_reader: u64,
790 default_batch_timeout_seconds: BatchRetrievalTimeout,
791 ) -> Result<PerformanceConfig, AccessorError> {
792 let batch_timeout = match params {
793 StreamParameters {
796 batch_retrieval_timeout_seconds,
797 performance_configuration: None,
798 ..
799 }
800 | StreamParameters {
801 batch_retrieval_timeout_seconds,
802 performance_configuration:
803 Some(PerformanceConfiguration { batch_retrieval_timeout_seconds: None, .. }),
804 ..
805 } => batch_retrieval_timeout_seconds,
806 StreamParameters {
809 batch_retrieval_timeout_seconds: None,
810 performance_configuration:
811 Some(PerformanceConfiguration { batch_retrieval_timeout_seconds, .. }),
812 ..
813 } => batch_retrieval_timeout_seconds,
814 _ => return Err(AccessorError::DuplicateBatchTimeout),
816 }
817 .map(BatchRetrievalTimeout::from_seconds)
818 .unwrap_or(default_batch_timeout_seconds);
819
820 let aggregated_content_limit_bytes = match params {
821 StreamParameters {
822 performance_configuration:
823 Some(PerformanceConfiguration { max_aggregate_content_size_bytes, .. }),
824 ..
825 } => *max_aggregate_content_size_bytes,
826 _ => None,
827 };
828
829 Ok(PerformanceConfig {
830 batch_timeout_sec: batch_timeout.seconds(),
831 aggregated_content_limit_bytes,
832 maximum_concurrent_snapshots_per_reader,
833 })
834 }
835}
836
837#[cfg(test)]
838mod tests {
839 use super::*;
840 use crate::diagnostics::AccessorStats;
841 use crate::logs::shared_buffer::create_ring_buffer;
842 use assert_matches::assert_matches;
843 use fidl_fuchsia_diagnostics::{ArchiveAccessorMarker, BatchIteratorMarker};
844 use fuchsia_inspect::{Inspector, Node};
845
846 #[fuchsia::test]
847 async fn logs_only_accept_basic_component_selectors() {
848 let scope = fasync::Scope::new();
849 let (accessor, stream) =
850 fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>();
851 let pipeline = Arc::new(Pipeline::for_test(None));
852 let inspector = Inspector::default();
853 let log_repo = LogsRepository::new(
854 create_ring_buffer(1_000_000),
855 std::iter::empty(),
856 inspector.root(),
857 scope.new_child(),
858 );
859 let inspect_repo =
860 Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], scope.new_child()));
861 let server = ArchiveAccessorServer::new(
862 inspect_repo,
863 log_repo,
864 4,
865 BatchRetrievalTimeout::max(),
866 scope.new_child(),
867 );
868 server.spawn_server(pipeline, stream);
869
870 let (batch_iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
872 assert!(
873 accessor
874 .r#stream_diagnostics(
875 &StreamParameters {
876 data_type: Some(DataType::Logs),
877 stream_mode: Some(StreamMode::SnapshotThenSubscribe),
878 format: Some(Format::Json),
879 client_selector_configuration: Some(
880 ClientSelectorConfiguration::Selectors(vec![
881 SelectorArgument::RawSelector("foo:root/bar:baz".to_string()),
882 ])
883 ),
884 ..Default::default()
885 },
886 server_end
887 )
888 .is_ok()
889 );
890 assert_matches!(
891 batch_iterator.get_next().await,
892 Err(fidl::Error::ClientChannelClosed { status: zx_status::Status::INVALID_ARGS, .. })
893 );
894
895 let (batch_iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
897 assert!(
898 accessor
899 .r#stream_diagnostics(
900 &StreamParameters {
901 data_type: Some(DataType::Logs),
902 stream_mode: Some(StreamMode::Snapshot),
903 format: Some(Format::Json),
904 client_selector_configuration: Some(
905 ClientSelectorConfiguration::Selectors(vec![
906 SelectorArgument::RawSelector("foo:root".to_string()),
907 ])
908 ),
909 ..Default::default()
910 },
911 server_end
912 )
913 .is_ok()
914 );
915
916 assert!(batch_iterator.get_next().await.is_ok());
917 }
918
919 #[fuchsia::test]
920 async fn accessor_skips_invalid_selectors() {
921 let scope = fasync::Scope::new();
922 let (accessor, stream) =
923 fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>();
924 let pipeline = Arc::new(Pipeline::for_test(None));
925 let inspector = Inspector::default();
926 let log_repo = LogsRepository::new(
927 create_ring_buffer(1_000_000),
928 std::iter::empty(),
929 inspector.root(),
930 scope.new_child(),
931 );
932 let inspect_repo =
933 Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], scope.new_child()));
934 let server = Arc::new(ArchiveAccessorServer::new(
935 inspect_repo,
936 log_repo,
937 4,
938 BatchRetrievalTimeout::max(),
939 scope.new_child(),
940 ));
941 server.spawn_server(pipeline, stream);
942
943 let (batch_iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
945
946 assert!(
947 accessor
948 .r#stream_diagnostics(
949 &StreamParameters {
950 data_type: Some(DataType::Inspect),
951 stream_mode: Some(StreamMode::Snapshot),
952 format: Some(Format::Json),
953 client_selector_configuration: Some(
954 ClientSelectorConfiguration::Selectors(vec![
955 SelectorArgument::RawSelector("invalid".to_string()),
956 SelectorArgument::RawSelector("valid:root".to_string()),
957 ])
958 ),
959 ..Default::default()
960 },
961 server_end
962 )
963 .is_ok()
964 );
965
966 assert!(batch_iterator.get_next().await.is_ok());
969 }
970
971 #[fuchsia::test]
972 async fn buffered_iterator_handles_two_consecutive_buffer_waits() {
973 let (client, server) = fidl::endpoints::create_proxy_and_stream::<BatchIteratorMarker>();
974 let _fut = client.get_next();
975 let mut server = server.peekable();
976 assert_matches!(server.wait_for_buffer().await, Ok(()));
977 assert_matches!(server.wait_for_buffer().await, Ok(()));
978 }
979
980 #[fuchsia::test]
981 async fn buffered_iterator_handles_peer_closed() {
982 let (client, server) = fidl::endpoints::create_proxy_and_stream::<BatchIteratorMarker>();
983 let mut server = server.peekable();
984 drop(client);
985 assert_matches!(
986 server
987 .write(vec![FormattedContent::Json(Buffer {
988 size: 1,
989 vmo: zx::Vmo::create(1).unwrap(),
990 })])
991 .await,
992 Err(IteratorError::PeerClosed)
993 );
994 }
995
996 #[fuchsia::test]
997 fn socket_writer_handles_json() {
998 let vmo = zx::Vmo::create(1).unwrap();
999 vmo.write(&[5u8], 0).unwrap();
1000 let koid = vmo.koid().unwrap();
1001 let text = FormattedContent::Json(Buffer { size: 1, vmo });
1002 let result = get_buffer_from_formatted_content(text).unwrap();
1003 assert_eq!(result.size, 1);
1004 assert_eq!(result.vmo.koid().unwrap(), koid);
1005 let mut buffer = [0];
1006 result.vmo.read(&mut buffer, 0).unwrap();
1007 assert_eq!(buffer[0], 5);
1008 }
1009
1010 #[fuchsia::test]
1011 fn socket_writer_does_not_handle_cbor() {
1012 let vmo = zx::Vmo::create(1).unwrap();
1013 vmo.write(&[5u8], 0).unwrap();
1014 let text = FormattedContent::Cbor(vmo);
1015 let result = get_buffer_from_formatted_content(text);
1016 assert_matches!(result, Err(AccessorError::UnsupportedFormat));
1017 }
1018
1019 #[fuchsia::test]
1020 async fn socket_writer_handles_closed_socket() {
1021 let (local, remote) = zx::Socket::create_stream();
1022 drop(local);
1023 let mut remote = fuchsia_async::Socket::from_socket(remote);
1024 {
1025 let result = ArchiveAccessorWriter::write(
1026 &mut remote,
1027 vec![FormattedContent::Json(Buffer { size: 1, vmo: zx::Vmo::create(1).unwrap() })],
1028 )
1029 .await;
1030 assert_matches!(result, Ok(()));
1031 }
1032 remote.wait_for_close().await;
1033 }
1034
1035 #[fuchsia::test]
1036 fn batch_iterator_terminates_on_client_disconnect() {
1037 let mut executor = fasync::TestExecutor::new();
1038 let (batch_iterator_proxy, stream) =
1039 fidl::endpoints::create_proxy_and_stream::<BatchIteratorMarker>();
1040 let batch_iterator = BatchIterator::new(
1042 futures::stream::pending::<diagnostics_data::Data<diagnostics_data::Logs>>(),
1043 stream.peekable(),
1044 StreamMode::Subscribe,
1045 Arc::new(AccessorStats::new(Node::default()).new_inspect_batch_iterator()),
1046 None,
1047 ftrace::Id::random(),
1048 Format::Json,
1049 )
1050 .expect("create batch iterator");
1051
1052 let mut batch_iterator_fut = batch_iterator.run().boxed();
1053 assert!(executor.run_until_stalled(&mut batch_iterator_fut).is_pending());
1054
1055 let mut iterator_request_fut = batch_iterator_proxy.get_next();
1057 assert!(executor.run_until_stalled(&mut iterator_request_fut).is_pending());
1058 assert!(executor.run_until_stalled(&mut batch_iterator_fut).is_pending());
1059 assert!(executor.run_until_stalled(&mut iterator_request_fut).is_pending());
1060
1061 drop(iterator_request_fut);
1064 drop(batch_iterator_proxy);
1065 assert_matches!(executor.run_singlethreaded(&mut batch_iterator_fut), Ok(()));
1066 }
1067
1068 #[fuchsia::test]
1069 async fn batch_iterator_on_ready_is_called() {
1070 let scope = fasync::Scope::new();
1071 let (accessor, stream) =
1072 fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>();
1073 let pipeline = Arc::new(Pipeline::for_test(None));
1074 let inspector = Inspector::default();
1075 let log_repo = LogsRepository::new(
1076 create_ring_buffer(1_000_000),
1077 std::iter::empty(),
1078 inspector.root(),
1079 scope.new_child(),
1080 );
1081 let inspect_repo =
1082 Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], scope.new_child()));
1083 let server = ArchiveAccessorServer::new(
1084 inspect_repo,
1085 log_repo,
1086 4,
1087 BatchRetrievalTimeout::max(),
1088 scope.new_child(),
1089 );
1090 server.spawn_server(pipeline, stream);
1091
1092 let (batch_iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
1094 assert!(
1095 accessor
1096 .r#stream_diagnostics(
1097 &StreamParameters {
1098 data_type: Some(DataType::Logs),
1099 stream_mode: Some(StreamMode::Subscribe),
1100 format: Some(Format::Json),
1101 client_selector_configuration: Some(
1102 ClientSelectorConfiguration::SelectAll(true)
1103 ),
1104 ..Default::default()
1105 },
1106 server_end
1107 )
1108 .is_ok()
1109 );
1110
1111 assert!(batch_iterator.wait_for_ready().await.is_ok());
1113 }
1114}