1#![deny(missing_docs)]
6
7use async_stream::stream;
11use diagnostics_data::{DiagnosticsData, LogsData};
12#[cfg(fuchsia_api_level_less_than = "HEAD")]
13use diagnostics_message as _;
14#[cfg(fuchsia_api_level_at_least = "HEAD")]
15use diagnostics_message::{RustMessageFormatter, from_extended_record};
16use fidl_fuchsia_diagnostics::{
17 ArchiveAccessorMarker, ArchiveAccessorProxy, BatchIteratorMarker, BatchIteratorProxy,
18 ClientSelectorConfiguration, Format, FormattedContent, PerformanceConfiguration, ReaderError,
19 Selector, SelectorArgument, StreamMode, StreamParameters,
20};
21use fuchsia_async::{self as fasync, DurationExt, TimeoutExt};
22use fuchsia_component::client;
23#[cfg(fuchsia_api_level_at_least = "HEAD")]
24use fuchsia_sync::Mutex;
25use futures::channel::mpsc;
26use futures::prelude::*;
27use futures::sink::SinkExt;
28use futures::stream::FusedStream;
29use pin_project::pin_project;
30use serde::Deserialize;
31use std::future::ready;
32use std::marker::PhantomData;
33use std::pin::Pin;
34use std::sync::Arc;
35use std::task::{Context, Poll};
36use thiserror::Error;
37use zx::{self as zx, MonotonicDuration};
38
39pub type LogsArchiveReader = ArchiveReader<Logs>;
41
42pub type InspectArchiveReader = ArchiveReader<Inspect>;
44
45pub use diagnostics_data::{Data, Inspect, Logs, Severity};
46pub use diagnostics_hierarchy::{DiagnosticsHierarchy, Property, hierarchy};
47
48const RETRY_DELAY_MS: i64 = 300;
49
50#[cfg(fuchsia_api_level_at_least = "HEAD")]
51const FORMAT: Format = Format::Cbor;
52#[cfg(fuchsia_api_level_less_than = "HEAD")]
53const FORMAT: Format = Format::Json;
54
55#[derive(Debug, Error)]
57pub enum Error {
58 #[error("Failed to connect to the archive accessor")]
60 ConnectToArchive(#[source] anyhow::Error),
61
62 #[error("Failed to create the BatchIterator channel ends")]
64 CreateIteratorProxy(#[source] fidl::Error),
65
66 #[error("Failed to stream diagnostics from the accessor")]
68 StreamDiagnostics(#[source] fidl::Error),
69
70 #[error("Failed to call iterator server")]
72 GetNextCall(#[source] fidl::Error),
73
74 #[error("Received error from the GetNext response: {0:?}")]
76 GetNextReaderError(ReaderError),
77
78 #[error("Failed to read json received")]
80 ReadJson(#[source] serde_json::Error),
81
82 #[cfg(fuchsia_api_level_at_least = "HEAD")]
84 #[error("Failed to read cbor received")]
85 ReadCbor(#[source] anyhow::Error),
86
87 #[error("Failed to parse the diagnostics data from the json received")]
89 ParseDiagnosticsData(#[source] serde_json::Error),
90
91 #[error("Failed to read vmo from the response")]
93 ReadVmo(#[source] zx::Status),
94}
95
96pub struct ComponentSelector {
98 moniker: Vec<String>,
99 tree_selectors: Vec<String>,
100}
101
102impl ComponentSelector {
103 pub fn new(moniker: Vec<String>) -> Self {
108 Self { moniker, tree_selectors: Vec::new() }
109 }
110
111 pub fn with_tree_selector(mut self, tree_selector: impl Into<String>) -> Self {
113 self.tree_selectors.push(tree_selector.into());
114 self
115 }
116
117 fn moniker_str(&self) -> String {
118 self.moniker.join("/")
119 }
120}
121
122pub trait ToSelectorArguments {
124 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>>;
126}
127
128pub trait ToComponentSelectorArguments {
130 fn to_component_selector_arguments(self) -> ComponentSelector;
132}
133
134impl ToComponentSelectorArguments for &str {
135 fn to_component_selector_arguments(self) -> ComponentSelector {
136 if self.contains("\\:") {
137 ComponentSelector::new(self.split("/").map(|value| value.to_string()).collect())
139 } else {
140 ComponentSelector::new(
142 selectors::sanitize_moniker_for_selectors(self)
143 .split("/")
144 .map(|value| value.to_string())
145 .collect(),
146 )
147 .with_tree_selector("[...]root")
148 }
149 }
150}
151
152impl ToComponentSelectorArguments for String {
153 fn to_component_selector_arguments(self) -> ComponentSelector {
154 self.as_str().to_component_selector_arguments()
155 }
156}
157
158impl ToComponentSelectorArguments for ComponentSelector {
159 fn to_component_selector_arguments(self) -> ComponentSelector {
160 self
161 }
162}
163
164impl ToSelectorArguments for String {
165 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
166 Box::new([SelectorArgument::RawSelector(self)].into_iter())
167 }
168}
169
170impl ToSelectorArguments for &str {
171 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
172 Box::new([SelectorArgument::RawSelector(self.to_string())].into_iter())
173 }
174}
175
176impl ToSelectorArguments for ComponentSelector {
177 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
178 let moniker = self.moniker_str();
179 if self.tree_selectors.is_empty() {
181 Box::new([SelectorArgument::RawSelector(format!("{moniker}:root"))].into_iter())
182 } else {
183 Box::new(
184 self.tree_selectors
185 .into_iter()
186 .map(move |s| SelectorArgument::RawSelector(format!("{moniker}:{s}"))),
187 )
188 }
189 }
190}
191
192impl ToSelectorArguments for Selector {
193 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
194 Box::new([SelectorArgument::StructuredSelector(self)].into_iter())
195 }
196}
197
198pub trait SerializableValue: private::Sealed {
200 const FORMAT_OF_VALUE: Format;
202}
203
204pub trait CheckResponse: private::Sealed {
206 fn has_payload(&self) -> bool;
208}
209
210mod private {
214 pub trait Sealed {}
215}
216impl private::Sealed for serde_json::Value {}
217impl private::Sealed for ciborium::Value {}
218impl<D: DiagnosticsData> private::Sealed for Data<D> {}
219
220impl<D: DiagnosticsData> CheckResponse for Data<D> {
221 fn has_payload(&self) -> bool {
222 self.payload.is_some()
223 }
224}
225
226impl SerializableValue for serde_json::Value {
227 const FORMAT_OF_VALUE: Format = Format::Json;
228}
229
230impl CheckResponse for serde_json::Value {
231 fn has_payload(&self) -> bool {
232 match self {
233 serde_json::Value::Object(obj) => {
234 obj.get("payload").map(|p| !matches!(p, serde_json::Value::Null)).is_some()
235 }
236 _ => false,
237 }
238 }
239}
240
241#[cfg(fuchsia_api_level_at_least = "HEAD")]
242impl SerializableValue for ciborium::Value {
243 const FORMAT_OF_VALUE: Format = Format::Cbor;
244}
245
246impl CheckResponse for ciborium::Value {
247 fn has_payload(&self) -> bool {
248 match self {
249 ciborium::Value::Map(m) => {
250 let payload_key = ciborium::Value::Text("payload".into());
251 m.iter().any(|(key, _)| *key == payload_key)
252 }
253 _ => false,
254 }
255 }
256}
257
258#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
260pub enum RetryConfig {
261 MinSchemaCount(usize),
265}
266
267impl RetryConfig {
268 pub fn always() -> Self {
270 Self::MinSchemaCount(1)
271 }
272
273 pub fn never() -> Self {
275 Self::MinSchemaCount(0)
276 }
277
278 fn should_retry(&self, result_count: usize) -> bool {
280 match self {
281 Self::MinSchemaCount(min) => *min > result_count,
282 }
283 }
284}
285
286pub trait DiagnosticsDataType: private::Sealed {}
288
289impl private::Sealed for Logs {}
290
291impl private::Sealed for Inspect {}
292
293impl DiagnosticsDataType for Logs {}
294
295impl DiagnosticsDataType for Inspect {}
296
297pub struct ArchiveReader<T> {
300 archive: Option<ArchiveAccessorProxy>,
301 selectors: Vec<SelectorArgument>,
302 retry_config: RetryConfig,
303 timeout: Option<MonotonicDuration>,
304 batch_retrieval_timeout_seconds: Option<i64>,
305 max_aggregated_content_size_bytes: Option<u64>,
306 format: Option<Format>,
307 _phantom: PhantomData<T>,
308}
309
310impl<T: DiagnosticsDataType> ArchiveReader<T> {
311 pub fn with_archive(&mut self, archive: ArchiveAccessorProxy) -> &mut Self {
315 self.archive = Some(archive);
316 self
317 }
318
319 pub fn with_minimum_schema_count(&mut self, minimum_schema_count: usize) -> &mut Self {
322 self.retry_config = RetryConfig::MinSchemaCount(minimum_schema_count);
323 self
324 }
325
326 pub fn retry(&mut self, config: RetryConfig) -> &mut Self {
328 self.retry_config = config;
329 self
330 }
331
332 pub fn with_timeout(&mut self, duration: MonotonicDuration) -> &mut Self {
335 self.timeout = Some(duration);
336 self
337 }
338
339 pub fn select_all_for_component(
343 &mut self,
344 component: impl ToComponentSelectorArguments,
345 ) -> &mut Self {
346 self.selectors.extend(component.to_component_selector_arguments().to_selector_arguments());
347 self
348 }
349
350 async fn snapshot_shared<D>(&self, format: Format) -> Result<Vec<Data<D>>, Error>
352 where
353 D: DiagnosticsData + 'static,
354 {
355 let data_future = self.snapshot_inner::<D, Data<D>>(format);
356 let data = match self.timeout {
357 Some(timeout) => data_future.on_timeout(timeout.after_now(), || Ok(Vec::new())).await?,
358 None => data_future.await?,
359 };
360 Ok(data)
361 }
362
363 async fn snapshot_inner<D, Y>(&self, format: Format) -> Result<Vec<Y>, Error>
364 where
365 D: DiagnosticsData,
366 Y: for<'a> Deserialize<'a> + CheckResponse + Send + 'static,
367 {
368 loop {
369 let iterator = self.batch_iterator::<D>(StreamMode::Snapshot, format)?;
370 let result = drain_batch_iterator::<Y>(Arc::new(iterator))
371 .filter_map(|value| ready(value.ok()))
372 .collect::<Vec<_>>()
373 .await;
374
375 if self.retry_config.should_retry(result.len()) {
376 fasync::Timer::new(fasync::MonotonicInstant::after(
377 zx::MonotonicDuration::from_millis(RETRY_DELAY_MS),
378 ))
379 .await;
380 } else {
381 return Ok(result);
382 }
383 }
384 }
385
386 fn batch_iterator<D>(
387 &self,
388 mode: StreamMode,
389 format: Format,
390 ) -> Result<BatchIteratorProxy, Error>
391 where
392 D: DiagnosticsData,
393 {
394 let archive = match &self.archive {
395 Some(archive) => archive.clone(),
396 None => client::connect_to_protocol::<ArchiveAccessorMarker>()
397 .map_err(Error::ConnectToArchive)?,
398 };
399
400 let (iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
401 let stream_parameters = StreamParameters {
402 stream_mode: Some(mode),
403 data_type: Some(D::DATA_TYPE),
404 format: Some(format),
405 client_selector_configuration: if self.selectors.is_empty() {
406 Some(ClientSelectorConfiguration::SelectAll(true))
407 } else {
408 Some(ClientSelectorConfiguration::Selectors(self.selectors.to_vec()))
409 },
410 performance_configuration: Some(PerformanceConfiguration {
411 max_aggregate_content_size_bytes: self.max_aggregated_content_size_bytes,
412 batch_retrieval_timeout_seconds: self.batch_retrieval_timeout_seconds,
413 ..Default::default()
414 }),
415 ..Default::default()
416 };
417
418 archive
419 .stream_diagnostics(&stream_parameters, server_end)
420 .map_err(Error::StreamDiagnostics)?;
421 Ok(iterator)
422 }
423}
424
425impl ArchiveReader<Logs> {
426 pub fn logs() -> Self {
428 ArchiveReader::<Logs> {
429 timeout: None,
430 format: None,
431 selectors: vec![],
432 retry_config: RetryConfig::always(),
433 archive: None,
434 batch_retrieval_timeout_seconds: None,
435 max_aggregated_content_size_bytes: None,
436 _phantom: PhantomData,
437 }
438 }
439
440 #[doc(hidden)]
441 pub fn with_format(&mut self, format: Format) -> &mut Self {
442 self.format = Some(format);
443 self
444 }
445
446 #[inline]
447 fn format(&self) -> Format {
448 match self.format {
449 Some(f) => f,
450 None => {
451 #[cfg(fuchsia_api_level_at_least = "HEAD")]
452 let ret = Format::LegacyFxt;
453 #[cfg(fuchsia_api_level_less_than = "HEAD")]
454 let ret = Format::Json;
455 ret
456 }
457 }
458 }
459
460 pub async fn snapshot(&self) -> Result<Vec<Data<Logs>>, Error> {
462 loop {
463 let iterator = self.batch_iterator::<Logs>(StreamMode::Snapshot, self.format())?;
464 let result = drain_batch_iterator_for_logs(Arc::new(iterator), Some(self.format()))
465 .filter_map(|value| ready(value.ok()))
466 .collect::<Vec<_>>()
467 .await;
468 if self.retry_config.should_retry(result.len()) {
469 fasync::Timer::new(fasync::MonotonicInstant::after(
470 zx::MonotonicDuration::from_millis(RETRY_DELAY_MS),
471 ))
472 .await;
473 } else {
474 return Ok(result);
475 }
476 }
477 }
478
479 pub fn snapshot_then_subscribe(&self) -> Result<Subscription, Error> {
482 let iterator =
483 self.batch_iterator::<Logs>(StreamMode::SnapshotThenSubscribe, self.format())?;
484 Ok(Subscription::new_with_format(iterator, self.format()))
485 }
486}
487
488impl ArchiveReader<Inspect> {
489 pub fn inspect() -> Self {
491 ArchiveReader::<Inspect> {
492 timeout: None,
493 format: None,
494 selectors: vec![],
495 retry_config: RetryConfig::always(),
496 archive: None,
497 batch_retrieval_timeout_seconds: None,
498 max_aggregated_content_size_bytes: None,
499 _phantom: PhantomData,
500 }
501 }
502
503 pub fn with_batch_retrieval_timeout_seconds(&mut self, timeout: i64) -> &mut Self {
506 self.batch_retrieval_timeout_seconds = Some(timeout);
507 self
508 }
509
510 pub fn with_aggregated_result_bytes_limit(&mut self, limit_bytes: u64) -> &mut Self {
512 self.max_aggregated_content_size_bytes = Some(limit_bytes);
513 self
514 }
515
516 pub async fn snapshot_raw<T>(&self) -> Result<T, Error>
521 where
522 T: for<'a> Deserialize<'a>
523 + SerializableValue
524 + From<Vec<T>>
525 + CheckResponse
526 + 'static
527 + Send,
528 {
529 let data_future = self.snapshot_inner::<Inspect, T>(T::FORMAT_OF_VALUE);
530 let data = match self.timeout {
531 Some(timeout) => data_future.on_timeout(timeout.after_now(), || Ok(Vec::new())).await?,
532 None => data_future.await?,
533 };
534 Ok(T::from(data))
535 }
536
537 pub fn add_selectors<T, S>(&mut self, selectors: T) -> &mut Self
540 where
541 T: Iterator<Item = S>,
542 S: ToSelectorArguments,
543 {
544 for selector in selectors {
545 self.add_selector(selector);
546 }
547 self
548 }
549
550 pub fn add_selector(&mut self, selector: impl ToSelectorArguments) -> &mut Self {
552 self.selectors.extend(selector.to_selector_arguments());
553 self
554 }
555
556 pub fn with_format(&mut self, format: Format) -> &mut Self {
558 self.format = Some(format);
559 self
560 }
561
562 #[inline]
563 fn format(&self) -> Format {
564 match self.format {
565 Some(f) => f,
566 None => FORMAT,
567 }
568 }
569
570 pub async fn snapshot(&self) -> Result<Vec<Data<Inspect>>, Error> {
572 self.snapshot_shared::<Inspect>(self.format()).await
573 }
574}
575
576#[derive(Debug, Deserialize)]
577#[serde(untagged)]
578enum OneOrMany<T> {
579 Many(Vec<T>),
580 One(T),
581}
582
583fn stream_batch<T>(
584 iterator: Arc<BatchIteratorProxy>,
585 process_content: impl Fn(FormattedContent) -> Result<OneOrMany<T>, Error>,
586) -> impl Stream<Item = Result<T, Error>>
587where
588 T: for<'a> Deserialize<'a> + Send + 'static,
589{
590 stream! {
591 loop {
592 let next_batch = iterator
593 .get_next()
594 .await
595 .map_err(Error::GetNextCall)?
596 .map_err(Error::GetNextReaderError)?;
597 if next_batch.is_empty() {
598 return;
600 }
601 for formatted_content in next_batch {
602 let output = process_content(formatted_content)?;
603 match output {
604 OneOrMany::One(data) => yield Ok(data),
605 OneOrMany::Many(datas) => {
606 for data in datas {
607 yield Ok(data);
608 }
609 }
610 }
611 }
612 }
613 }
614}
615
616pub fn drain_batch_iterator<T>(
618 iterator: Arc<BatchIteratorProxy>,
619) -> impl Stream<Item = Result<T, Error>>
620where
621 T: for<'a> Deserialize<'a> + Send + 'static,
622{
623 stream_batch(iterator, |formatted_content| match formatted_content {
624 FormattedContent::Json(data) => {
625 let mut buf = vec![0; data.size as usize];
626 data.vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
627 serde_json::from_slice(&buf).map_err(Error::ReadJson)
628 }
629 #[cfg(fuchsia_api_level_at_least = "HEAD")]
630 FormattedContent::Cbor(vmo) => {
631 let mut buf = vec![0; vmo.get_content_size().expect("Always returns Ok") as usize];
632 vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
633 Ok(ciborium::from_reader(buf.as_slice()).map_err(|err| Error::ReadCbor(err.into()))?)
634 }
635 #[cfg(fuchsia_api_level_at_least = "HEAD")]
636 FormattedContent::Fxt(_) => unreachable!("We never expect FXT for Inspect"),
637 FormattedContent::__SourceBreaking { unknown_ordinal: _ } => {
638 unreachable!("Received unrecognized FIDL message")
639 }
640 })
641}
642
643fn drain_batch_iterator_for_logs(
644 iterator: Arc<BatchIteratorProxy>,
645 _format: Option<Format>,
646) -> impl Stream<Item = Result<LogsData, Error>> {
647 #[cfg(fuchsia_api_level_at_least = "HEAD")]
648 let parser = Arc::new(Mutex::new(diagnostics_message::MessageParser::default()));
649 stream_batch::<LogsData>(iterator, move |formatted_content| match formatted_content {
650 FormattedContent::Json(data) => {
651 let mut buf = vec![0; data.size as usize];
652 data.vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
653 serde_json::from_slice(&buf).map_err(Error::ReadJson)
654 }
655 #[cfg(fuchsia_api_level_at_least = "HEAD")]
656 FormattedContent::Fxt(vmo) => {
657 let mut buf = vec![0; vmo.get_content_size().expect("Always returns Ok") as usize];
658 vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
659 let mut current_slice: &[u8] = &buf;
660 let mut items = vec![];
661 let mut parser = parser.lock();
662
663 while !current_slice.is_empty() {
664 if _format == Some(Format::Fxt) {
665 match parser.parse_next(current_slice, RustMessageFormatter) {
666 Ok((maybe_data, remaining)) => {
667 assert!(remaining.len() < current_slice.len(), "Parser must advance");
668 if let Some(data) = maybe_data {
669 items.push(data);
670 }
671 current_slice = remaining;
672 }
673 Err(_) => {
674 break;
677 }
678 }
679 } else {
680 match from_extended_record(current_slice) {
681 Ok((data, remaining)) => {
682 items.push(data);
683 current_slice = remaining;
684 }
685 Err(_) => {
686 break;
689 }
690 }
691 }
692 }
693 Ok(OneOrMany::Many(items))
694 }
695 #[cfg(fuchsia_api_level_at_least = "HEAD")]
696 FormattedContent::Cbor(_) => unreachable!("We never expect CBOR"),
697 FormattedContent::__SourceBreaking { unknown_ordinal: _ } => {
698 unreachable!("Received unrecognized FIDL message")
699 }
700 })
701}
702
703#[pin_project]
705pub struct Subscription {
706 #[pin]
707 recv: Pin<Box<dyn FusedStream<Item = Result<LogsData, Error>> + Send>>,
708 iterator: Arc<BatchIteratorProxy>,
709}
710
711const DATA_CHANNEL_SIZE: usize = 32;
712const ERROR_CHANNEL_SIZE: usize = 2;
713
714impl Subscription {
715 pub fn new(iterator: BatchIteratorProxy) -> Self {
718 let iterator = Arc::new(iterator);
719 Subscription {
720 recv: Box::pin(drain_batch_iterator_for_logs(iterator.clone(), None).fuse()),
721 iterator,
722 }
723 }
724
725 pub fn new_with_format(iterator: BatchIteratorProxy, format: Format) -> Self {
728 let iterator = Arc::new(iterator);
729 Subscription {
730 recv: Box::pin(drain_batch_iterator_for_logs(iterator.clone(), Some(format)).fuse()),
731 iterator,
732 }
733 }
734
735 pub async fn wait_for_ready(&self) {
737 self.iterator.wait_for_ready().await.expect("doesn't disconnect");
738 }
739
740 pub fn split_streams(mut self) -> (SubscriptionResultsStream<LogsData>, mpsc::Receiver<Error>) {
742 let (mut errors_sender, errors) = mpsc::channel(ERROR_CHANNEL_SIZE);
743 let (mut results_sender, recv) = mpsc::channel(DATA_CHANNEL_SIZE);
744 let _drain_task = fasync::Task::spawn(async move {
745 while let Some(result) = self.next().await {
746 match result {
747 Ok(value) => results_sender.send(value).await.ok(),
748 Err(e) => errors_sender.send(e).await.ok(),
749 };
750 }
751 });
752 (SubscriptionResultsStream { recv, _drain_task }, errors)
753 }
754}
755
756impl Stream for Subscription {
757 type Item = Result<LogsData, Error>;
758
759 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
760 let this = self.project();
761 this.recv.poll_next(cx)
762 }
763}
764
765impl FusedStream for Subscription {
766 fn is_terminated(&self) -> bool {
767 self.recv.is_terminated()
768 }
769}
770
771#[pin_project]
773pub struct SubscriptionResultsStream<T> {
774 #[pin]
775 recv: mpsc::Receiver<T>,
776 _drain_task: fasync::Task<()>,
777}
778
779impl<T> Stream for SubscriptionResultsStream<T>
780where
781 T: for<'a> Deserialize<'a>,
782{
783 type Item = T;
784
785 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
786 let this = self.project();
787 this.recv.poll_next(cx)
788 }
789}
790
791impl<T> FusedStream for SubscriptionResultsStream<T>
792where
793 T: for<'a> Deserialize<'a>,
794{
795 fn is_terminated(&self) -> bool {
796 self.recv.is_terminated()
797 }
798}
799
800#[cfg(test)]
801mod tests {
802 use super::*;
803 use assert_matches::assert_matches;
804 use diagnostics_assertions::assert_data_tree;
805 use diagnostics_log::{Publisher, PublisherOptions};
806 use fidl::endpoints::ServerEnd;
807 use fidl_fuchsia_diagnostics as fdiagnostics;
808 use fuchsia_component_test::{
809 Capability, ChildOptions, RealmBuilder, RealmInstance, Ref, Route,
810 };
811 use futures::TryStreamExt;
812 use log::{error, info};
813
814 const TEST_COMPONENT_URL: &str = "#meta/inspect_test_component.cm";
815
816 struct ComponentOptions {
817 publish_n_trees: u64,
818 }
819
820 async fn start_component(opts: ComponentOptions) -> Result<RealmInstance, anyhow::Error> {
821 let builder = RealmBuilder::new().await?;
822 let test_component = builder
823 .add_child("test_component", TEST_COMPONENT_URL, ChildOptions::new().eager())
824 .await?;
825 builder
826 .add_route(
827 Route::new()
828 .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
829 .from(Ref::parent())
830 .to(&test_component),
831 )
832 .await?;
833 builder.init_mutable_config_to_empty(&test_component).await.unwrap();
834 builder
835 .set_config_value(&test_component, "publish_n_trees", opts.publish_n_trees.into())
836 .await
837 .unwrap();
838 let instance = builder.build().await?;
839 Ok(instance)
840 }
841
842 #[fuchsia::test]
845 async fn inspect_data_for_component() -> Result<(), anyhow::Error> {
846 let instance = start_component(ComponentOptions { publish_n_trees: 1 }).await?;
847 let moniker = format!("realm_builder:{}/test_component", instance.root.child_name());
848 let component_selector = selectors::sanitize_moniker_for_selectors(&moniker);
849 let results = ArchiveReader::inspect()
850 .add_selector(format!("{component_selector}:[...]root"))
851 .snapshot()
852 .await?;
853 assert_eq!(results.len(), 1);
854 assert_data_tree!(results[0].payload.as_ref().unwrap(), root: {
855 "tree-0": 0u64,
856 int: 3u64,
857 "lazy-node": {
858 a: "test",
859 child: {
860 double: 3.25,
861 },
862 }
863 });
864 let lazy_property_selector = Selector {
866 component_selector: Some(fdiagnostics::ComponentSelector {
867 moniker_segments: Some(vec![
868 fdiagnostics::StringSelector::ExactMatch(format!(
869 "realm_builder:{}",
870 instance.root.child_name()
871 )),
872 fdiagnostics::StringSelector::ExactMatch("test_component".into()),
873 ]),
874 ..Default::default()
875 }),
876 tree_selector: Some(fdiagnostics::TreeSelector::PropertySelector(
877 fdiagnostics::PropertySelector {
878 node_path: vec![
879 fdiagnostics::StringSelector::ExactMatch("root".into()),
880 fdiagnostics::StringSelector::ExactMatch("lazy-node".into()),
881 ],
882 target_properties: fdiagnostics::StringSelector::ExactMatch("a".into()),
883 },
884 )),
885 tree_names: Some(fdiagnostics::TreeNames::All(fdiagnostics::All {})),
886 ..Default::default()
887 };
888 let int_property_selector = format!("{component_selector}:[...]root:int");
889 let mut reader = ArchiveReader::inspect();
890 reader.add_selector(int_property_selector).add_selector(lazy_property_selector);
891 let response = reader.snapshot().await?;
892 assert_eq!(response.len(), 1);
893 assert_eq!(response[0].moniker.to_string(), moniker);
894 assert_data_tree!(response[0].payload.as_ref().unwrap(), root: {
895 int: 3u64,
896 "lazy-node": {
897 a: "test"
898 }
899 });
900 Ok(())
901 }
902
903 #[fuchsia::test]
904 async fn select_all_for_moniker() {
905 let instance = start_component(ComponentOptions { publish_n_trees: 1 })
906 .await
907 .expect("component started");
908 let moniker = format!("realm_builder:{}/test_component", instance.root.child_name());
909 let results = ArchiveReader::inspect()
910 .select_all_for_component(moniker)
911 .snapshot()
912 .await
913 .expect("snapshotted");
914 assert_eq!(results.len(), 1);
915 assert_data_tree!(results[0].payload.as_ref().unwrap(), root: {
916 "tree-0": 0u64,
917 int: 3u64,
918 "lazy-node": {
919 a: "test",
920 child: {
921 double: 3.25,
922 },
923 }
924 });
925 }
926
927 #[fuchsia::test]
928 async fn timeout() -> Result<(), anyhow::Error> {
929 let instance = start_component(ComponentOptions { publish_n_trees: 1 }).await?;
930
931 let mut reader = ArchiveReader::inspect();
932 reader
933 .add_selector(format!(
934 "realm_builder\\:{}/test_component:root",
935 instance.root.child_name()
936 ))
937 .with_timeout(zx::MonotonicDuration::from_nanos(0));
938 let result = reader.snapshot().await;
939 assert!(result.unwrap().is_empty());
940 Ok(())
941 }
942
943 #[fuchsia::test]
944 async fn component_selector() {
945 let selector = ComponentSelector::new(vec!["a".to_string()]);
946 assert_eq!(selector.moniker_str(), "a");
947 let arguments: Vec<_> = selector.to_selector_arguments().collect();
948 assert_eq!(arguments, vec![SelectorArgument::RawSelector("a:root".to_string())]);
949
950 let selector =
951 ComponentSelector::new(vec!["b".to_string(), "c".to_string(), "a".to_string()]);
952 assert_eq!(selector.moniker_str(), "b/c/a");
953
954 let selector = selector.with_tree_selector("root/b/c:d").with_tree_selector("root/e:f");
955 let arguments: Vec<_> = selector.to_selector_arguments().collect();
956 assert_eq!(
957 arguments,
958 vec![
959 SelectorArgument::RawSelector("b/c/a:root/b/c:d".into()),
960 SelectorArgument::RawSelector("b/c/a:root/e:f".into()),
961 ]
962 );
963 }
964
965 #[fuchsia::test]
966 async fn custom_archive() {
967 let proxy = spawn_fake_archive(serde_json::json!({
968 "moniker": "moniker",
969 "version": 1,
970 "data_source": "Inspect",
971 "metadata": {
972 "component_url": "component-url",
973 "timestamp": 0,
974 "filename": "filename",
975 },
976 "payload": {
977 "root": {
978 "x": 1,
979 }
980 }
981 }));
982 let result =
983 ArchiveReader::inspect().with_archive(proxy).snapshot().await.expect("got result");
984 assert_eq!(result.len(), 1);
985 assert_data_tree!(result[0].payload.as_ref().unwrap(), root: { x: 1u64 });
986 }
987
988 #[fuchsia::test]
989 async fn handles_lists_correctly_on_snapshot_raw() {
990 let value = serde_json::json!({
991 "moniker": "moniker",
992 "version": 1,
993 "data_source": "Inspect",
994 "metadata": {
995 "component_url": "component-url",
996 "timestamp": 0,
997 "filename": "filename",
998 },
999 "payload": {
1000 "root": {
1001 "x": 1,
1002 }
1003 }
1004 });
1005 let proxy = spawn_fake_archive(serde_json::json!([value.clone()]));
1006 let mut reader = ArchiveReader::inspect();
1007 reader.with_archive(proxy);
1008 let json_result = reader.snapshot_raw::<serde_json::Value>().await.expect("got result");
1009 match json_result {
1010 serde_json::Value::Array(values) => {
1011 assert_eq!(values.len(), 1);
1012 assert_eq!(values[0], value);
1013 }
1014 result => panic!("unexpected result: {result:?}"),
1015 }
1016 let cbor_result = reader.snapshot_raw::<ciborium::Value>().await.expect("got result");
1017 match cbor_result {
1018 ciborium::Value::Array(values) => {
1019 assert_eq!(values.len(), 1);
1020 let json_result =
1021 values[0].deserialized::<serde_json::Value>().expect("convert to json");
1022 assert_eq!(json_result, value);
1023 }
1024 result => panic!("unexpected result: {result:?}"),
1025 }
1026 }
1027
1028 #[fuchsia::test(logging = false)]
1029 async fn snapshot_then_subscribe() {
1030 let (_instance, publisher, reader) = init_isolated_logging().await;
1031 let (mut stream, _errors) =
1032 reader.snapshot_then_subscribe().expect("subscribed to logs").split_streams();
1033 publisher.register_logger(None).unwrap();
1034 info!("hello from test");
1035 error!("error from test");
1036 let log = stream.next().await.unwrap();
1037 assert_eq!(log.msg().unwrap(), "hello from test");
1038 let log = stream.next().await.unwrap();
1039 assert_eq!(log.msg().unwrap(), "error from test");
1040 }
1041
1042 #[fuchsia::test]
1043 async fn read_many_trees_with_filtering() {
1044 let instance = start_component(ComponentOptions { publish_n_trees: 2 })
1045 .await
1046 .expect("component started");
1047 let selector = format!(
1048 "realm_builder\\:{}/test_component:[name=tree-0]root",
1049 instance.root.child_name()
1050 );
1051 let results = ArchiveReader::inspect()
1052 .add_selector(selector)
1053 .with_minimum_schema_count(1)
1055 .snapshot()
1056 .await
1057 .expect("snapshotted");
1058 assert_matches!(results.iter().find(|v| v.metadata.name.as_ref() == "tree-1"), None);
1059 let should_have_data =
1060 results.into_iter().find(|v| v.metadata.name.as_ref() == "tree-0").unwrap();
1061 assert_data_tree!(should_have_data.payload.unwrap(), root: contains {
1062 "tree-0": 0u64,
1063 });
1064 }
1065
1066 fn spawn_fake_archive(data_to_send: serde_json::Value) -> fdiagnostics::ArchiveAccessorProxy {
1067 let (proxy, mut stream) =
1068 fidl::endpoints::create_proxy_and_stream::<fdiagnostics::ArchiveAccessorMarker>();
1069 fasync::Task::spawn(async move {
1070 while let Some(request) = stream.try_next().await.expect("stream request") {
1071 match request {
1072 fdiagnostics::ArchiveAccessorRequest::StreamDiagnostics {
1073 result_stream,
1074 ..
1075 } => {
1076 let data = data_to_send.clone();
1077 fasync::Task::spawn(handle_batch_iterator(data, result_stream)).detach();
1078 }
1079 fdiagnostics::ArchiveAccessorRequest::WaitForReady { responder, .. } => {
1080 let _ = responder.send();
1081 }
1082 fdiagnostics::ArchiveAccessorRequest::_UnknownMethod { .. } => {
1083 unreachable!("Unexpected method call");
1084 }
1085 }
1086 }
1087 })
1088 .detach();
1089 proxy
1090 }
1091
1092 async fn handle_batch_iterator(
1093 data: serde_json::Value,
1094 result_stream: ServerEnd<fdiagnostics::BatchIteratorMarker>,
1095 ) {
1096 let mut called = false;
1097 let mut stream = result_stream.into_stream();
1098 while let Some(req) = stream.try_next().await.expect("stream request") {
1099 match req {
1100 fdiagnostics::BatchIteratorRequest::WaitForReady { responder } => {
1101 let _ = responder.send();
1102 }
1103 fdiagnostics::BatchIteratorRequest::GetNext { responder } => {
1104 if called {
1105 responder.send(Ok(Vec::new())).expect("send response");
1106 continue;
1107 }
1108 called = true;
1109 let content = serde_json::to_string_pretty(&data).expect("json pretty");
1110 let vmo_size = content.len() as u64;
1111 let vmo = zx::Vmo::create(vmo_size).expect("create vmo");
1112 vmo.write(content.as_bytes(), 0).expect("write vmo");
1113 let buffer = fidl_fuchsia_mem::Buffer { vmo, size: vmo_size };
1114 responder
1115 .send(Ok(vec![fdiagnostics::FormattedContent::Json(buffer)]))
1116 .expect("send response");
1117 }
1118 fdiagnostics::BatchIteratorRequest::_UnknownMethod { .. } => {
1119 unreachable!("Unexpected method call");
1120 }
1121 }
1122 }
1123 }
1124
1125 async fn create_realm() -> RealmBuilder {
1126 let builder = RealmBuilder::new().await.expect("create realm builder");
1127 let archivist = builder
1128 .add_child("archivist", "#meta/archivist-for-embedding.cm", ChildOptions::new().eager())
1129 .await
1130 .expect("add child archivist");
1131 builder
1132 .add_route(
1133 Route::new()
1134 .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
1135 .capability(
1136 Capability::protocol_by_name("fuchsia.tracing.provider.Registry")
1137 .optional(),
1138 )
1139 .capability(Capability::event_stream("stopped"))
1140 .capability(Capability::event_stream("capability_requested"))
1141 .from(Ref::parent())
1142 .to(&archivist),
1143 )
1144 .await
1145 .expect("added routes from parent to archivist");
1146 builder
1147 .add_route(
1148 Route::new()
1149 .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
1150 .from(&archivist)
1151 .to(Ref::parent()),
1152 )
1153 .await
1154 .expect("routed LogSink from archivist to parent");
1155 builder
1156 .add_route(
1157 Route::new()
1158 .capability(Capability::protocol_by_name("fuchsia.diagnostics.ArchiveAccessor"))
1159 .from(Ref::dictionary(&archivist, "diagnostics-accessors"))
1160 .to(Ref::parent()),
1161 )
1162 .await
1163 .expect("routed ArchiveAccessor from archivist to parent");
1164 builder
1165 }
1166
1167 async fn init_isolated_logging() -> (RealmInstance, Publisher, ArchiveReader<Logs>) {
1168 let instance = create_realm().await.build().await.unwrap();
1169 let log_sink_client = instance.root.connect_to_protocol_at_exposed_dir().unwrap();
1170 let accessor_proxy = instance.root.connect_to_protocol_at_exposed_dir().unwrap();
1171 let mut reader = ArchiveReader::logs();
1172 reader.with_archive(accessor_proxy);
1173 let options = PublisherOptions::default().use_log_sink(log_sink_client);
1174 let publisher = Publisher::new_async(options).await.unwrap();
1175 (instance, publisher, reader)
1176 }
1177
1178 #[fuchsia::test]
1179 fn retry_config_behavior() {
1180 let config = RetryConfig::MinSchemaCount(1);
1181 let got = 0;
1182
1183 assert!(config.should_retry(got));
1184
1185 let config = RetryConfig::MinSchemaCount(1);
1186 let got = 1;
1187
1188 assert!(!config.should_retry(got));
1189
1190 let config = RetryConfig::MinSchemaCount(1);
1191 let got = 2;
1192
1193 assert!(!config.should_retry(got));
1194
1195 let config = RetryConfig::MinSchemaCount(0);
1196 let got = 1;
1197
1198 assert!(!config.should_retry(got));
1199
1200 let config = RetryConfig::always();
1201 let got = 0;
1202
1203 assert!(config.should_retry(got));
1204
1205 let config = RetryConfig::never();
1206 let got = 0;
1207
1208 assert!(!config.should_retry(got));
1209 }
1210}