1#![deny(missing_docs)]
6
7use async_stream::stream;
11use diagnostics_data::{DiagnosticsData, LogsData};
12#[cfg(fuchsia_api_level_less_than = "HEAD")]
13use diagnostics_message as _;
14#[cfg(fuchsia_api_level_at_least = "HEAD")]
15use diagnostics_message::from_extended_record;
16use fidl_fuchsia_diagnostics::{
17 ArchiveAccessorMarker, ArchiveAccessorProxy, BatchIteratorMarker, BatchIteratorProxy,
18 ClientSelectorConfiguration, Format, FormattedContent, PerformanceConfiguration, ReaderError,
19 Selector, SelectorArgument, StreamMode, StreamParameters,
20};
21use fuchsia_async::{self as fasync, DurationExt, TimeoutExt};
22use fuchsia_component::client;
23use futures::channel::mpsc;
24use futures::prelude::*;
25use futures::sink::SinkExt;
26use futures::stream::FusedStream;
27use pin_project::pin_project;
28use serde::Deserialize;
29use std::future::ready;
30use std::marker::PhantomData;
31use std::pin::Pin;
32use std::sync::Arc;
33use std::task::{Context, Poll};
34use thiserror::Error;
35use zx::{self as zx, MonotonicDuration};
36
37pub type LogsArchiveReader = ArchiveReader<Logs>;
39
40pub type InspectArchiveReader = ArchiveReader<Inspect>;
42
43pub use diagnostics_data::{Data, Inspect, Logs, Severity};
44pub use diagnostics_hierarchy::{DiagnosticsHierarchy, Property, hierarchy};
45
46const RETRY_DELAY_MS: i64 = 300;
47
48#[cfg(fuchsia_api_level_at_least = "HEAD")]
49const FORMAT: Format = Format::Cbor;
50#[cfg(fuchsia_api_level_less_than = "HEAD")]
51const FORMAT: Format = Format::Json;
52
53#[derive(Debug, Error)]
55pub enum Error {
56 #[error("Failed to connect to the archive accessor")]
58 ConnectToArchive(#[source] anyhow::Error),
59
60 #[error("Failed to create the BatchIterator channel ends")]
62 CreateIteratorProxy(#[source] fidl::Error),
63
64 #[error("Failed to stream diagnostics from the accessor")]
66 StreamDiagnostics(#[source] fidl::Error),
67
68 #[error("Failed to call iterator server")]
70 GetNextCall(#[source] fidl::Error),
71
72 #[error("Received error from the GetNext response: {0:?}")]
74 GetNextReaderError(ReaderError),
75
76 #[error("Failed to read json received")]
78 ReadJson(#[source] serde_json::Error),
79
80 #[cfg(fuchsia_api_level_at_least = "HEAD")]
82 #[error("Failed to read cbor received")]
83 ReadCbor(#[source] anyhow::Error),
84
85 #[error("Failed to parse the diagnostics data from the json received")]
87 ParseDiagnosticsData(#[source] serde_json::Error),
88
89 #[error("Failed to read vmo from the response")]
91 ReadVmo(#[source] zx::Status),
92}
93
94pub struct ComponentSelector {
96 moniker: Vec<String>,
97 tree_selectors: Vec<String>,
98}
99
100impl ComponentSelector {
101 pub fn new(moniker: Vec<String>) -> Self {
106 Self { moniker, tree_selectors: Vec::new() }
107 }
108
109 pub fn with_tree_selector(mut self, tree_selector: impl Into<String>) -> Self {
111 self.tree_selectors.push(tree_selector.into());
112 self
113 }
114
115 fn moniker_str(&self) -> String {
116 self.moniker.join("/")
117 }
118}
119
120pub trait ToSelectorArguments {
122 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>>;
124}
125
126pub trait ToComponentSelectorArguments {
128 fn to_component_selector_arguments(self) -> ComponentSelector;
130}
131
132impl ToComponentSelectorArguments for &str {
133 fn to_component_selector_arguments(self) -> ComponentSelector {
134 if self.contains("\\:") {
135 ComponentSelector::new(self.split("/").map(|value| value.to_string()).collect())
137 } else {
138 ComponentSelector::new(
140 selectors::sanitize_moniker_for_selectors(self)
141 .split("/")
142 .map(|value| value.to_string())
143 .collect(),
144 )
145 .with_tree_selector("[...]root")
146 }
147 }
148}
149
150impl ToComponentSelectorArguments for String {
151 fn to_component_selector_arguments(self) -> ComponentSelector {
152 self.as_str().to_component_selector_arguments()
153 }
154}
155
156impl ToComponentSelectorArguments for ComponentSelector {
157 fn to_component_selector_arguments(self) -> ComponentSelector {
158 self
159 }
160}
161
162impl ToSelectorArguments for String {
163 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
164 Box::new([SelectorArgument::RawSelector(self)].into_iter())
165 }
166}
167
168impl ToSelectorArguments for &str {
169 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
170 Box::new([SelectorArgument::RawSelector(self.to_string())].into_iter())
171 }
172}
173
174impl ToSelectorArguments for ComponentSelector {
175 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
176 let moniker = self.moniker_str();
177 if self.tree_selectors.is_empty() {
179 Box::new([SelectorArgument::RawSelector(format!("{moniker}:root"))].into_iter())
180 } else {
181 Box::new(
182 self.tree_selectors
183 .into_iter()
184 .map(move |s| SelectorArgument::RawSelector(format!("{moniker}:{s}"))),
185 )
186 }
187 }
188}
189
190impl ToSelectorArguments for Selector {
191 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
192 Box::new([SelectorArgument::StructuredSelector(self)].into_iter())
193 }
194}
195
196pub trait SerializableValue: private::Sealed {
198 const FORMAT_OF_VALUE: Format;
200}
201
202pub trait CheckResponse: private::Sealed {
204 fn has_payload(&self) -> bool;
206}
207
208mod private {
212 pub trait Sealed {}
213}
214impl private::Sealed for serde_json::Value {}
215impl private::Sealed for ciborium::Value {}
216impl<D: DiagnosticsData> private::Sealed for Data<D> {}
217
218impl<D: DiagnosticsData> CheckResponse for Data<D> {
219 fn has_payload(&self) -> bool {
220 self.payload.is_some()
221 }
222}
223
224impl SerializableValue for serde_json::Value {
225 const FORMAT_OF_VALUE: Format = Format::Json;
226}
227
228impl CheckResponse for serde_json::Value {
229 fn has_payload(&self) -> bool {
230 match self {
231 serde_json::Value::Object(obj) => {
232 obj.get("payload").map(|p| !matches!(p, serde_json::Value::Null)).is_some()
233 }
234 _ => false,
235 }
236 }
237}
238
239#[cfg(fuchsia_api_level_at_least = "HEAD")]
240impl SerializableValue for ciborium::Value {
241 const FORMAT_OF_VALUE: Format = Format::Cbor;
242}
243
244impl CheckResponse for ciborium::Value {
245 fn has_payload(&self) -> bool {
246 match self {
247 ciborium::Value::Map(m) => {
248 let payload_key = ciborium::Value::Text("payload".into());
249 m.iter().any(|(key, _)| *key == payload_key)
250 }
251 _ => false,
252 }
253 }
254}
255
256#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
258pub enum RetryConfig {
259 MinSchemaCount(usize),
263}
264
265impl RetryConfig {
266 pub fn always() -> Self {
268 Self::MinSchemaCount(1)
269 }
270
271 pub fn never() -> Self {
273 Self::MinSchemaCount(0)
274 }
275
276 fn should_retry(&self, result_count: usize) -> bool {
278 match self {
279 Self::MinSchemaCount(min) => *min > result_count,
280 }
281 }
282}
283
284pub trait DiagnosticsDataType: private::Sealed {}
286
287impl private::Sealed for Logs {}
288
289impl private::Sealed for Inspect {}
290
291impl DiagnosticsDataType for Logs {}
292
293impl DiagnosticsDataType for Inspect {}
294
295pub struct ArchiveReader<T> {
298 archive: Option<ArchiveAccessorProxy>,
299 selectors: Vec<SelectorArgument>,
300 retry_config: RetryConfig,
301 timeout: Option<MonotonicDuration>,
302 batch_retrieval_timeout_seconds: Option<i64>,
303 max_aggregated_content_size_bytes: Option<u64>,
304 format: Option<Format>,
305 _phantom: PhantomData<T>,
306}
307
308impl<T: DiagnosticsDataType> ArchiveReader<T> {
309 pub fn with_archive(&mut self, archive: ArchiveAccessorProxy) -> &mut Self {
313 self.archive = Some(archive);
314 self
315 }
316
317 pub fn with_minimum_schema_count(&mut self, minimum_schema_count: usize) -> &mut Self {
320 self.retry_config = RetryConfig::MinSchemaCount(minimum_schema_count);
321 self
322 }
323
324 pub fn retry(&mut self, config: RetryConfig) -> &mut Self {
326 self.retry_config = config;
327 self
328 }
329
330 pub fn with_timeout(&mut self, duration: MonotonicDuration) -> &mut Self {
333 self.timeout = Some(duration);
334 self
335 }
336
337 pub fn select_all_for_component(
341 &mut self,
342 component: impl ToComponentSelectorArguments,
343 ) -> &mut Self {
344 self.selectors.extend(component.to_component_selector_arguments().to_selector_arguments());
345 self
346 }
347
348 async fn snapshot_shared<D>(&self) -> Result<Vec<Data<D>>, Error>
350 where
351 D: DiagnosticsData + 'static,
352 {
353 let data_future = self.snapshot_inner::<D, Data<D>>(FORMAT);
354 let data = match self.timeout {
355 Some(timeout) => data_future.on_timeout(timeout.after_now(), || Ok(Vec::new())).await?,
356 None => data_future.await?,
357 };
358 Ok(data)
359 }
360
361 async fn snapshot_inner<D, Y>(&self, format: Format) -> Result<Vec<Y>, Error>
362 where
363 D: DiagnosticsData,
364 Y: for<'a> Deserialize<'a> + CheckResponse + Send + 'static,
365 {
366 loop {
367 let iterator = self.batch_iterator::<D>(StreamMode::Snapshot, format)?;
368 let result = drain_batch_iterator::<Y>(Arc::new(iterator))
369 .filter_map(|value| ready(value.ok()))
370 .collect::<Vec<_>>()
371 .await;
372
373 if self.retry_config.should_retry(result.len()) {
374 fasync::Timer::new(fasync::MonotonicInstant::after(
375 zx::MonotonicDuration::from_millis(RETRY_DELAY_MS),
376 ))
377 .await;
378 } else {
379 return Ok(result);
380 }
381 }
382 }
383
384 fn batch_iterator<D>(
385 &self,
386 mode: StreamMode,
387 format: Format,
388 ) -> Result<BatchIteratorProxy, Error>
389 where
390 D: DiagnosticsData,
391 {
392 let archive = match &self.archive {
393 Some(archive) => archive.clone(),
394 None => client::connect_to_protocol::<ArchiveAccessorMarker>()
395 .map_err(Error::ConnectToArchive)?,
396 };
397
398 let (iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
399
400 let stream_parameters = StreamParameters {
401 stream_mode: Some(mode),
402 data_type: Some(D::DATA_TYPE),
403 format: Some(format),
404 client_selector_configuration: if self.selectors.is_empty() {
405 Some(ClientSelectorConfiguration::SelectAll(true))
406 } else {
407 Some(ClientSelectorConfiguration::Selectors(self.selectors.to_vec()))
408 },
409 performance_configuration: Some(PerformanceConfiguration {
410 max_aggregate_content_size_bytes: self.max_aggregated_content_size_bytes,
411 batch_retrieval_timeout_seconds: self.batch_retrieval_timeout_seconds,
412 ..Default::default()
413 }),
414 ..Default::default()
415 };
416
417 archive
418 .stream_diagnostics(&stream_parameters, server_end)
419 .map_err(Error::StreamDiagnostics)?;
420 Ok(iterator)
421 }
422}
423
424impl ArchiveReader<Logs> {
425 pub fn logs() -> Self {
427 ArchiveReader::<Logs> {
428 timeout: None,
429 format: None,
430 selectors: vec![],
431 retry_config: RetryConfig::always(),
432 archive: None,
433 batch_retrieval_timeout_seconds: None,
434 max_aggregated_content_size_bytes: None,
435 _phantom: PhantomData,
436 }
437 }
438
439 #[doc(hidden)]
440 pub fn with_format(&mut self, format: Format) -> &mut Self {
441 self.format = Some(format);
442 self
443 }
444
445 #[inline]
446 fn format(&self) -> Format {
447 match self.format {
448 Some(f) => f,
449 None => {
450 #[cfg(fuchsia_api_level_at_least = "HEAD")]
451 let ret = Format::Fxt;
452 #[cfg(fuchsia_api_level_less_than = "HEAD")]
453 let ret = Format::Json;
454 ret
455 }
456 }
457 }
458
459 pub async fn snapshot(&self) -> Result<Vec<Data<Logs>>, Error> {
461 loop {
462 let iterator = self.batch_iterator::<Logs>(StreamMode::Snapshot, self.format())?;
463 let result = drain_batch_iterator_for_logs(Arc::new(iterator))
464 .filter_map(|value| ready(value.ok()))
465 .collect::<Vec<_>>()
466 .await;
467
468 if self.retry_config.should_retry(result.len()) {
469 fasync::Timer::new(fasync::MonotonicInstant::after(
470 zx::MonotonicDuration::from_millis(RETRY_DELAY_MS),
471 ))
472 .await;
473 } else {
474 return Ok(result);
475 }
476 }
477 }
478
479 pub fn snapshot_then_subscribe(&self) -> Result<Subscription, Error> {
482 let iterator =
483 self.batch_iterator::<Logs>(StreamMode::SnapshotThenSubscribe, self.format())?;
484 Ok(Subscription::new(iterator))
485 }
486}
487
488impl ArchiveReader<Inspect> {
489 pub fn inspect() -> Self {
491 ArchiveReader::<Inspect> {
492 timeout: None,
493 format: None,
494 selectors: vec![],
495 retry_config: RetryConfig::always(),
496 archive: None,
497 batch_retrieval_timeout_seconds: None,
498 max_aggregated_content_size_bytes: None,
499 _phantom: PhantomData,
500 }
501 }
502
503 pub fn with_batch_retrieval_timeout_seconds(&mut self, timeout: i64) -> &mut Self {
506 self.batch_retrieval_timeout_seconds = Some(timeout);
507 self
508 }
509
510 pub fn with_aggregated_result_bytes_limit(&mut self, limit_bytes: u64) -> &mut Self {
512 self.max_aggregated_content_size_bytes = Some(limit_bytes);
513 self
514 }
515
516 pub async fn snapshot_raw<T>(&self) -> Result<T, Error>
521 where
522 T: for<'a> Deserialize<'a>
523 + SerializableValue
524 + From<Vec<T>>
525 + CheckResponse
526 + 'static
527 + Send,
528 {
529 let data_future = self.snapshot_inner::<Inspect, T>(T::FORMAT_OF_VALUE);
530 let data = match self.timeout {
531 Some(timeout) => data_future.on_timeout(timeout.after_now(), || Ok(Vec::new())).await?,
532 None => data_future.await?,
533 };
534 Ok(T::from(data))
535 }
536
537 pub fn add_selectors<T, S>(&mut self, selectors: T) -> &mut Self
540 where
541 T: Iterator<Item = S>,
542 S: ToSelectorArguments,
543 {
544 for selector in selectors {
545 self.add_selector(selector);
546 }
547 self
548 }
549
550 pub fn add_selector(&mut self, selector: impl ToSelectorArguments) -> &mut Self {
552 self.selectors.extend(selector.to_selector_arguments());
553 self
554 }
555
556 pub async fn snapshot(&self) -> Result<Vec<Data<Inspect>>, Error> {
558 self.snapshot_shared::<Inspect>().await
559 }
560}
561
562#[derive(Debug, Deserialize)]
563#[serde(untagged)]
564enum OneOrMany<T> {
565 Many(Vec<T>),
566 One(T),
567}
568
569fn stream_batch<T>(
570 iterator: Arc<BatchIteratorProxy>,
571 process_content: impl Fn(FormattedContent) -> Result<OneOrMany<T>, Error>,
572) -> impl Stream<Item = Result<T, Error>>
573where
574 T: for<'a> Deserialize<'a> + Send + 'static,
575{
576 stream! {
577 loop {
578 let next_batch = iterator
579 .get_next()
580 .await
581 .map_err(Error::GetNextCall)?
582 .map_err(Error::GetNextReaderError)?;
583 if next_batch.is_empty() {
584 return;
586 }
587 for formatted_content in next_batch {
588 let output = process_content(formatted_content)?;
589 match output {
590 OneOrMany::One(data) => yield Ok(data),
591 OneOrMany::Many(datas) => {
592 for data in datas {
593 yield Ok(data);
594 }
595 }
596 }
597 }
598 }
599 }
600}
601
602pub fn drain_batch_iterator<T>(
604 iterator: Arc<BatchIteratorProxy>,
605) -> impl Stream<Item = Result<T, Error>>
606where
607 T: for<'a> Deserialize<'a> + Send + 'static,
608{
609 stream_batch(iterator, |formatted_content| match formatted_content {
610 FormattedContent::Json(data) => {
611 let mut buf = vec![0; data.size as usize];
612 data.vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
613 serde_json::from_slice(&buf).map_err(Error::ReadJson)
614 }
615 #[cfg(fuchsia_api_level_at_least = "HEAD")]
616 FormattedContent::Cbor(vmo) => {
617 let mut buf = vec![0; vmo.get_content_size().expect("Always returns Ok") as usize];
618 vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
619 Ok(ciborium::from_reader(buf.as_slice()).map_err(|err| Error::ReadCbor(err.into()))?)
620 }
621 #[cfg(fuchsia_api_level_at_least = "HEAD")]
622 FormattedContent::Fxt(_) => unreachable!("We never expect FXT for Inspect"),
623 FormattedContent::__SourceBreaking { unknown_ordinal: _ } => {
624 unreachable!("Received unrecognized FIDL message")
625 }
626 })
627}
628
629fn drain_batch_iterator_for_logs(
630 iterator: Arc<BatchIteratorProxy>,
631) -> impl Stream<Item = Result<LogsData, Error>> {
632 stream_batch::<LogsData>(iterator, |formatted_content| match formatted_content {
633 FormattedContent::Json(data) => {
634 let mut buf = vec![0; data.size as usize];
635 data.vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
636 serde_json::from_slice(&buf).map_err(Error::ReadJson)
637 }
638 #[cfg(fuchsia_api_level_at_least = "HEAD")]
639 FormattedContent::Fxt(vmo) => {
640 let mut buf = vec![0; vmo.get_content_size().expect("Always returns Ok") as usize];
641 vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
642 let mut current_slice: &[u8] = buf.as_ref();
643 let mut items = vec![];
644 while !current_slice.is_empty() {
645 match from_extended_record(current_slice) {
646 Ok((data, remaining)) => {
647 items.push(data);
648 current_slice = remaining;
649 }
650 Err(_) => {
651 break;
654 }
655 }
656 }
657 Ok(OneOrMany::Many(items))
658 }
659 #[cfg(fuchsia_api_level_at_least = "HEAD")]
660 FormattedContent::Cbor(_) => unreachable!("We never expect CBOR"),
661 FormattedContent::__SourceBreaking { unknown_ordinal: _ } => {
662 unreachable!("Received unrecognized FIDL message")
663 }
664 })
665}
666
667#[pin_project]
669pub struct Subscription {
670 #[pin]
671 recv: Pin<Box<dyn FusedStream<Item = Result<LogsData, Error>> + Send>>,
672 iterator: Arc<BatchIteratorProxy>,
673}
674
675const DATA_CHANNEL_SIZE: usize = 32;
676const ERROR_CHANNEL_SIZE: usize = 2;
677
678impl Subscription {
679 pub fn new(iterator: BatchIteratorProxy) -> Self {
682 let iterator = Arc::new(iterator);
683 Subscription {
684 recv: Box::pin(drain_batch_iterator_for_logs(iterator.clone()).fuse()),
685 iterator,
686 }
687 }
688
689 pub async fn wait_for_ready(&self) {
691 self.iterator.wait_for_ready().await.expect("doesn't disconnect");
692 }
693
694 pub fn split_streams(mut self) -> (SubscriptionResultsStream<LogsData>, mpsc::Receiver<Error>) {
696 let (mut errors_sender, errors) = mpsc::channel(ERROR_CHANNEL_SIZE);
697 let (mut results_sender, recv) = mpsc::channel(DATA_CHANNEL_SIZE);
698 let _drain_task = fasync::Task::spawn(async move {
699 while let Some(result) = self.next().await {
700 match result {
701 Ok(value) => results_sender.send(value).await.ok(),
702 Err(e) => errors_sender.send(e).await.ok(),
703 };
704 }
705 });
706 (SubscriptionResultsStream { recv, _drain_task }, errors)
707 }
708}
709
710impl Stream for Subscription {
711 type Item = Result<LogsData, Error>;
712
713 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
714 let this = self.project();
715 this.recv.poll_next(cx)
716 }
717}
718
719impl FusedStream for Subscription {
720 fn is_terminated(&self) -> bool {
721 self.recv.is_terminated()
722 }
723}
724
725#[pin_project]
727pub struct SubscriptionResultsStream<T> {
728 #[pin]
729 recv: mpsc::Receiver<T>,
730 _drain_task: fasync::Task<()>,
731}
732
733impl<T> Stream for SubscriptionResultsStream<T>
734where
735 T: for<'a> Deserialize<'a>,
736{
737 type Item = T;
738
739 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
740 let this = self.project();
741 this.recv.poll_next(cx)
742 }
743}
744
745impl<T> FusedStream for SubscriptionResultsStream<T>
746where
747 T: for<'a> Deserialize<'a>,
748{
749 fn is_terminated(&self) -> bool {
750 self.recv.is_terminated()
751 }
752}
753
754#[cfg(test)]
755mod tests {
756 use super::*;
757 use assert_matches::assert_matches;
758 use diagnostics_assertions::assert_data_tree;
759 use diagnostics_log::{Publisher, PublisherOptions};
760 use fidl::endpoints::ServerEnd;
761 use fidl_fuchsia_diagnostics as fdiagnostics;
762 use fuchsia_component_test::{
763 Capability, ChildOptions, RealmBuilder, RealmInstance, Ref, Route,
764 };
765 use futures::TryStreamExt;
766 use log::{error, info};
767
768 const TEST_COMPONENT_URL: &str = "#meta/inspect_test_component.cm";
769
770 struct ComponentOptions {
771 publish_n_trees: u64,
772 }
773
774 async fn start_component(opts: ComponentOptions) -> Result<RealmInstance, anyhow::Error> {
775 let builder = RealmBuilder::new().await?;
776 let test_component = builder
777 .add_child("test_component", TEST_COMPONENT_URL, ChildOptions::new().eager())
778 .await?;
779 builder
780 .add_route(
781 Route::new()
782 .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
783 .from(Ref::parent())
784 .to(&test_component),
785 )
786 .await?;
787 builder.init_mutable_config_to_empty(&test_component).await.unwrap();
788 builder
789 .set_config_value(&test_component, "publish_n_trees", opts.publish_n_trees.into())
790 .await
791 .unwrap();
792 let instance = builder.build().await?;
793 Ok(instance)
794 }
795
796 #[fuchsia::test]
799 async fn inspect_data_for_component() -> Result<(), anyhow::Error> {
800 let instance = start_component(ComponentOptions { publish_n_trees: 1 }).await?;
801 let moniker = format!("realm_builder:{}/test_component", instance.root.child_name());
802 let component_selector = selectors::sanitize_moniker_for_selectors(&moniker);
803 let results = ArchiveReader::inspect()
804 .add_selector(format!("{component_selector}:[...]root"))
805 .snapshot()
806 .await?;
807 assert_eq!(results.len(), 1);
808 assert_data_tree!(results[0].payload.as_ref().unwrap(), root: {
809 "tree-0": 0u64,
810 int: 3u64,
811 "lazy-node": {
812 a: "test",
813 child: {
814 double: 3.25,
815 },
816 }
817 });
818 let lazy_property_selector = Selector {
820 component_selector: Some(fdiagnostics::ComponentSelector {
821 moniker_segments: Some(vec![
822 fdiagnostics::StringSelector::ExactMatch(format!(
823 "realm_builder:{}",
824 instance.root.child_name()
825 )),
826 fdiagnostics::StringSelector::ExactMatch("test_component".into()),
827 ]),
828 ..Default::default()
829 }),
830 tree_selector: Some(fdiagnostics::TreeSelector::PropertySelector(
831 fdiagnostics::PropertySelector {
832 node_path: vec![
833 fdiagnostics::StringSelector::ExactMatch("root".into()),
834 fdiagnostics::StringSelector::ExactMatch("lazy-node".into()),
835 ],
836 target_properties: fdiagnostics::StringSelector::ExactMatch("a".into()),
837 },
838 )),
839 tree_names: Some(fdiagnostics::TreeNames::All(fdiagnostics::All {})),
840 ..Default::default()
841 };
842 let int_property_selector = format!("{component_selector}:[...]root:int");
843 let mut reader = ArchiveReader::inspect();
844 reader.add_selector(int_property_selector).add_selector(lazy_property_selector);
845 let response = reader.snapshot().await?;
846 assert_eq!(response.len(), 1);
847 assert_eq!(response[0].moniker.to_string(), moniker);
848 assert_data_tree!(response[0].payload.as_ref().unwrap(), root: {
849 int: 3u64,
850 "lazy-node": {
851 a: "test"
852 }
853 });
854 Ok(())
855 }
856
857 #[fuchsia::test]
858 async fn select_all_for_moniker() {
859 let instance = start_component(ComponentOptions { publish_n_trees: 1 })
860 .await
861 .expect("component started");
862 let moniker = format!("realm_builder:{}/test_component", instance.root.child_name());
863 let results = ArchiveReader::inspect()
864 .select_all_for_component(moniker)
865 .snapshot()
866 .await
867 .expect("snapshotted");
868 assert_eq!(results.len(), 1);
869 assert_data_tree!(results[0].payload.as_ref().unwrap(), root: {
870 "tree-0": 0u64,
871 int: 3u64,
872 "lazy-node": {
873 a: "test",
874 child: {
875 double: 3.25,
876 },
877 }
878 });
879 }
880
881 #[fuchsia::test]
882 async fn timeout() -> Result<(), anyhow::Error> {
883 let instance = start_component(ComponentOptions { publish_n_trees: 1 }).await?;
884
885 let mut reader = ArchiveReader::inspect();
886 reader
887 .add_selector(format!(
888 "realm_builder\\:{}/test_component:root",
889 instance.root.child_name()
890 ))
891 .with_timeout(zx::MonotonicDuration::from_nanos(0));
892 let result = reader.snapshot().await;
893 assert!(result.unwrap().is_empty());
894 Ok(())
895 }
896
897 #[fuchsia::test]
898 async fn component_selector() {
899 let selector = ComponentSelector::new(vec!["a".to_string()]);
900 assert_eq!(selector.moniker_str(), "a");
901 let arguments: Vec<_> = selector.to_selector_arguments().collect();
902 assert_eq!(arguments, vec![SelectorArgument::RawSelector("a:root".to_string())]);
903
904 let selector =
905 ComponentSelector::new(vec!["b".to_string(), "c".to_string(), "a".to_string()]);
906 assert_eq!(selector.moniker_str(), "b/c/a");
907
908 let selector = selector.with_tree_selector("root/b/c:d").with_tree_selector("root/e:f");
909 let arguments: Vec<_> = selector.to_selector_arguments().collect();
910 assert_eq!(
911 arguments,
912 vec![
913 SelectorArgument::RawSelector("b/c/a:root/b/c:d".into()),
914 SelectorArgument::RawSelector("b/c/a:root/e:f".into()),
915 ]
916 );
917 }
918
919 #[fuchsia::test]
920 async fn custom_archive() {
921 let proxy = spawn_fake_archive(serde_json::json!({
922 "moniker": "moniker",
923 "version": 1,
924 "data_source": "Inspect",
925 "metadata": {
926 "component_url": "component-url",
927 "timestamp": 0,
928 "filename": "filename",
929 },
930 "payload": {
931 "root": {
932 "x": 1,
933 }
934 }
935 }));
936 let result =
937 ArchiveReader::inspect().with_archive(proxy).snapshot().await.expect("got result");
938 assert_eq!(result.len(), 1);
939 assert_data_tree!(result[0].payload.as_ref().unwrap(), root: { x: 1u64 });
940 }
941
942 #[fuchsia::test]
943 async fn handles_lists_correctly_on_snapshot_raw() {
944 let value = serde_json::json!({
945 "moniker": "moniker",
946 "version": 1,
947 "data_source": "Inspect",
948 "metadata": {
949 "component_url": "component-url",
950 "timestamp": 0,
951 "filename": "filename",
952 },
953 "payload": {
954 "root": {
955 "x": 1,
956 }
957 }
958 });
959 let proxy = spawn_fake_archive(serde_json::json!([value.clone()]));
960 let mut reader = ArchiveReader::inspect();
961 reader.with_archive(proxy);
962 let json_result = reader.snapshot_raw::<serde_json::Value>().await.expect("got result");
963 match json_result {
964 serde_json::Value::Array(values) => {
965 assert_eq!(values.len(), 1);
966 assert_eq!(values[0], value);
967 }
968 result => panic!("unexpected result: {result:?}"),
969 }
970 let cbor_result = reader.snapshot_raw::<ciborium::Value>().await.expect("got result");
971 match cbor_result {
972 ciborium::Value::Array(values) => {
973 assert_eq!(values.len(), 1);
974 let json_result =
975 values[0].deserialized::<serde_json::Value>().expect("convert to json");
976 assert_eq!(json_result, value);
977 }
978 result => panic!("unexpected result: {result:?}"),
979 }
980 }
981
982 #[fuchsia::test(logging = false)]
983 async fn snapshot_then_subscribe() {
984 let (_instance, publisher, reader) = init_isolated_logging().await;
985 let (mut stream, _errors) =
986 reader.snapshot_then_subscribe().expect("subscribed to logs").split_streams();
987 publisher.register_logger(None).unwrap();
988 info!("hello from test");
989 error!("error from test");
990 let log = stream.next().await.unwrap();
991 assert_eq!(log.msg().unwrap(), "hello from test");
992 let log = stream.next().await.unwrap();
993 assert_eq!(log.msg().unwrap(), "error from test");
994 }
995
996 #[fuchsia::test]
997 async fn read_many_trees_with_filtering() {
998 let instance = start_component(ComponentOptions { publish_n_trees: 2 })
999 .await
1000 .expect("component started");
1001 let selector = format!(
1002 "realm_builder\\:{}/test_component:[name=tree-0]root",
1003 instance.root.child_name()
1004 );
1005 let results = ArchiveReader::inspect()
1006 .add_selector(selector)
1007 .with_minimum_schema_count(1)
1009 .snapshot()
1010 .await
1011 .expect("snapshotted");
1012 assert_matches!(results.iter().find(|v| v.metadata.name.as_ref() == "tree-1"), None);
1013 let should_have_data =
1014 results.into_iter().find(|v| v.metadata.name.as_ref() == "tree-0").unwrap();
1015 assert_data_tree!(should_have_data.payload.unwrap(), root: contains {
1016 "tree-0": 0u64,
1017 });
1018 }
1019
1020 fn spawn_fake_archive(data_to_send: serde_json::Value) -> fdiagnostics::ArchiveAccessorProxy {
1021 let (proxy, mut stream) =
1022 fidl::endpoints::create_proxy_and_stream::<fdiagnostics::ArchiveAccessorMarker>();
1023 fasync::Task::spawn(async move {
1024 while let Some(request) = stream.try_next().await.expect("stream request") {
1025 match request {
1026 fdiagnostics::ArchiveAccessorRequest::StreamDiagnostics {
1027 result_stream,
1028 ..
1029 } => {
1030 let data = data_to_send.clone();
1031 fasync::Task::spawn(handle_batch_iterator(data, result_stream)).detach();
1032 }
1033 fdiagnostics::ArchiveAccessorRequest::WaitForReady { responder, .. } => {
1034 let _ = responder.send();
1035 }
1036 fdiagnostics::ArchiveAccessorRequest::_UnknownMethod { .. } => {
1037 unreachable!("Unexpected method call");
1038 }
1039 }
1040 }
1041 })
1042 .detach();
1043 proxy
1044 }
1045
1046 async fn handle_batch_iterator(
1047 data: serde_json::Value,
1048 result_stream: ServerEnd<fdiagnostics::BatchIteratorMarker>,
1049 ) {
1050 let mut called = false;
1051 let mut stream = result_stream.into_stream();
1052 while let Some(req) = stream.try_next().await.expect("stream request") {
1053 match req {
1054 fdiagnostics::BatchIteratorRequest::WaitForReady { responder } => {
1055 let _ = responder.send();
1056 }
1057 fdiagnostics::BatchIteratorRequest::GetNext { responder } => {
1058 if called {
1059 responder.send(Ok(Vec::new())).expect("send response");
1060 continue;
1061 }
1062 called = true;
1063 let content = serde_json::to_string_pretty(&data).expect("json pretty");
1064 let vmo_size = content.len() as u64;
1065 let vmo = zx::Vmo::create(vmo_size).expect("create vmo");
1066 vmo.write(content.as_bytes(), 0).expect("write vmo");
1067 let buffer = fidl_fuchsia_mem::Buffer { vmo, size: vmo_size };
1068 responder
1069 .send(Ok(vec![fdiagnostics::FormattedContent::Json(buffer)]))
1070 .expect("send response");
1071 }
1072 fdiagnostics::BatchIteratorRequest::_UnknownMethod { .. } => {
1073 unreachable!("Unexpected method call");
1074 }
1075 }
1076 }
1077 }
1078
1079 async fn create_realm() -> RealmBuilder {
1080 let builder = RealmBuilder::new().await.expect("create realm builder");
1081 let archivist = builder
1082 .add_child("archivist", "#meta/archivist-for-embedding.cm", ChildOptions::new().eager())
1083 .await
1084 .expect("add child archivist");
1085 builder
1086 .add_route(
1087 Route::new()
1088 .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
1089 .capability(
1090 Capability::protocol_by_name("fuchsia.tracing.provider.Registry")
1091 .optional(),
1092 )
1093 .capability(Capability::event_stream("stopped"))
1094 .capability(Capability::event_stream("capability_requested"))
1095 .from(Ref::parent())
1096 .to(&archivist),
1097 )
1098 .await
1099 .expect("added routes from parent to archivist");
1100 builder
1101 .add_route(
1102 Route::new()
1103 .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
1104 .from(&archivist)
1105 .to(Ref::parent()),
1106 )
1107 .await
1108 .expect("routed LogSink from archivist to parent");
1109 builder
1110 .add_route(
1111 Route::new()
1112 .capability(Capability::protocol_by_name("fuchsia.diagnostics.ArchiveAccessor"))
1113 .from(Ref::dictionary(&archivist, "diagnostics-accessors"))
1114 .to(Ref::parent()),
1115 )
1116 .await
1117 .expect("routed ArchiveAccessor from archivist to parent");
1118 builder
1119 }
1120
1121 async fn init_isolated_logging() -> (RealmInstance, Publisher, ArchiveReader<Logs>) {
1122 let instance = create_realm().await.build().await.unwrap();
1123 let log_sink_client = instance.root.connect_to_protocol_at_exposed_dir().unwrap();
1124 let accessor_proxy = instance.root.connect_to_protocol_at_exposed_dir().unwrap();
1125 let mut reader = ArchiveReader::logs();
1126 reader.with_archive(accessor_proxy);
1127 let options = PublisherOptions::default().use_log_sink(log_sink_client);
1128 let publisher = Publisher::new_async(options).await.unwrap();
1129 (instance, publisher, reader)
1130 }
1131
1132 #[fuchsia::test]
1133 fn retry_config_behavior() {
1134 let config = RetryConfig::MinSchemaCount(1);
1135 let got = 0;
1136
1137 assert!(config.should_retry(got));
1138
1139 let config = RetryConfig::MinSchemaCount(1);
1140 let got = 1;
1141
1142 assert!(!config.should_retry(got));
1143
1144 let config = RetryConfig::MinSchemaCount(1);
1145 let got = 2;
1146
1147 assert!(!config.should_retry(got));
1148
1149 let config = RetryConfig::MinSchemaCount(0);
1150 let got = 1;
1151
1152 assert!(!config.should_retry(got));
1153
1154 let config = RetryConfig::always();
1155 let got = 0;
1156
1157 assert!(config.should_retry(got));
1158
1159 let config = RetryConfig::never();
1160 let got = 0;
1161
1162 assert!(!config.should_retry(got));
1163 }
1164}