1#![deny(missing_docs)]
6
7use async_stream::stream;
11use diagnostics_data::{DiagnosticsData, LogsData};
12#[cfg(fuchsia_api_level_less_than = "HEAD")]
13use diagnostics_message as _;
14#[cfg(fuchsia_api_level_at_least = "HEAD")]
15use diagnostics_message::from_extended_record;
16use fidl_fuchsia_diagnostics::{
17 ArchiveAccessorMarker, ArchiveAccessorProxy, BatchIteratorMarker, BatchIteratorProxy,
18 ClientSelectorConfiguration, Format, FormattedContent, PerformanceConfiguration, ReaderError,
19 Selector, SelectorArgument, StreamMode, StreamParameters,
20};
21use fuchsia_async::{self as fasync, DurationExt, TimeoutExt};
22use fuchsia_component::client;
23use futures::channel::mpsc;
24use futures::prelude::*;
25use futures::sink::SinkExt;
26use futures::stream::FusedStream;
27use pin_project::pin_project;
28use serde::Deserialize;
29use std::future::ready;
30use std::marker::PhantomData;
31use std::pin::Pin;
32use std::sync::Arc;
33use std::task::{Context, Poll};
34use thiserror::Error;
35use zx::{self as zx, MonotonicDuration};
36
37pub type LogsArchiveReader = ArchiveReader<Logs>;
39
40pub type InspectArchiveReader = ArchiveReader<Inspect>;
42
43pub use diagnostics_data::{Data, Inspect, Logs, Severity};
44pub use diagnostics_hierarchy::{DiagnosticsHierarchy, Property, hierarchy};
45
46const RETRY_DELAY_MS: i64 = 300;
47
48#[cfg(fuchsia_api_level_at_least = "HEAD")]
49const FORMAT: Format = Format::Cbor;
50#[cfg(fuchsia_api_level_less_than = "HEAD")]
51const FORMAT: Format = Format::Json;
52
53#[derive(Debug, Error)]
55pub enum Error {
56 #[error("Failed to connect to the archive accessor")]
58 ConnectToArchive(#[source] anyhow::Error),
59
60 #[error("Failed to create the BatchIterator channel ends")]
62 CreateIteratorProxy(#[source] fidl::Error),
63
64 #[error("Failed to stream diagnostics from the accessor")]
66 StreamDiagnostics(#[source] fidl::Error),
67
68 #[error("Failed to call iterator server")]
70 GetNextCall(#[source] fidl::Error),
71
72 #[error("Received error from the GetNext response: {0:?}")]
74 GetNextReaderError(ReaderError),
75
76 #[error("Failed to read json received")]
78 ReadJson(#[source] serde_json::Error),
79
80 #[cfg(fuchsia_api_level_at_least = "HEAD")]
82 #[error("Failed to read cbor received")]
83 ReadCbor(#[source] anyhow::Error),
84
85 #[error("Failed to parse the diagnostics data from the json received")]
87 ParseDiagnosticsData(#[source] serde_json::Error),
88
89 #[error("Failed to read vmo from the response")]
91 ReadVmo(#[source] zx::Status),
92}
93
94pub struct ComponentSelector {
96 moniker: Vec<String>,
97 tree_selectors: Vec<String>,
98}
99
100impl ComponentSelector {
101 pub fn new(moniker: Vec<String>) -> Self {
106 Self { moniker, tree_selectors: Vec::new() }
107 }
108
109 pub fn with_tree_selector(mut self, tree_selector: impl Into<String>) -> Self {
111 self.tree_selectors.push(tree_selector.into());
112 self
113 }
114
115 fn moniker_str(&self) -> String {
116 self.moniker.join("/")
117 }
118}
119
120pub trait ToSelectorArguments {
122 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>>;
124}
125
126pub trait ToComponentSelectorArguments {
128 fn to_component_selector_arguments(self) -> ComponentSelector;
130}
131
132impl ToComponentSelectorArguments for &str {
133 fn to_component_selector_arguments(self) -> ComponentSelector {
134 if self.contains("\\:") {
135 ComponentSelector::new(self.split("/").map(|value| value.to_string()).collect())
137 } else {
138 ComponentSelector::new(
140 selectors::sanitize_moniker_for_selectors(self)
141 .split("/")
142 .map(|value| value.to_string())
143 .collect(),
144 )
145 .with_tree_selector("[...]root")
146 }
147 }
148}
149
150impl ToComponentSelectorArguments for String {
151 fn to_component_selector_arguments(self) -> ComponentSelector {
152 self.as_str().to_component_selector_arguments()
153 }
154}
155
156impl ToComponentSelectorArguments for ComponentSelector {
157 fn to_component_selector_arguments(self) -> ComponentSelector {
158 self
159 }
160}
161
162impl ToSelectorArguments for String {
163 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
164 Box::new([SelectorArgument::RawSelector(self)].into_iter())
165 }
166}
167
168impl ToSelectorArguments for &str {
169 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
170 Box::new([SelectorArgument::RawSelector(self.to_string())].into_iter())
171 }
172}
173
174impl ToSelectorArguments for ComponentSelector {
175 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
176 let moniker = self.moniker_str();
177 if self.tree_selectors.is_empty() {
179 Box::new([SelectorArgument::RawSelector(format!("{moniker}:root"))].into_iter())
180 } else {
181 Box::new(
182 self.tree_selectors
183 .into_iter()
184 .map(move |s| SelectorArgument::RawSelector(format!("{moniker}:{s}"))),
185 )
186 }
187 }
188}
189
190impl ToSelectorArguments for Selector {
191 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
192 Box::new([SelectorArgument::StructuredSelector(self)].into_iter())
193 }
194}
195
196pub trait SerializableValue: private::Sealed {
198 const FORMAT_OF_VALUE: Format;
200}
201
202pub trait CheckResponse: private::Sealed {
204 fn has_payload(&self) -> bool;
206}
207
208mod private {
212 pub trait Sealed {}
213}
214impl private::Sealed for serde_json::Value {}
215impl private::Sealed for ciborium::Value {}
216impl<D: DiagnosticsData> private::Sealed for Data<D> {}
217
218impl<D: DiagnosticsData> CheckResponse for Data<D> {
219 fn has_payload(&self) -> bool {
220 self.payload.is_some()
221 }
222}
223
224impl SerializableValue for serde_json::Value {
225 const FORMAT_OF_VALUE: Format = Format::Json;
226}
227
228impl CheckResponse for serde_json::Value {
229 fn has_payload(&self) -> bool {
230 match self {
231 serde_json::Value::Object(obj) => {
232 obj.get("payload").map(|p| !matches!(p, serde_json::Value::Null)).is_some()
233 }
234 _ => false,
235 }
236 }
237}
238
239#[cfg(fuchsia_api_level_at_least = "HEAD")]
240impl SerializableValue for ciborium::Value {
241 const FORMAT_OF_VALUE: Format = Format::Cbor;
242}
243
244impl CheckResponse for ciborium::Value {
245 fn has_payload(&self) -> bool {
246 match self {
247 ciborium::Value::Map(m) => {
248 let payload_key = ciborium::Value::Text("payload".into());
249 m.iter().any(|(key, _)| *key == payload_key)
250 }
251 _ => false,
252 }
253 }
254}
255
256#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
258pub enum RetryConfig {
259 MinSchemaCount(usize),
263}
264
265impl RetryConfig {
266 pub fn always() -> Self {
268 Self::MinSchemaCount(1)
269 }
270
271 pub fn never() -> Self {
273 Self::MinSchemaCount(0)
274 }
275
276 fn should_retry(&self, result_count: usize) -> bool {
278 match self {
279 Self::MinSchemaCount(min) => *min > result_count,
280 }
281 }
282}
283
284pub trait DiagnosticsDataType: private::Sealed {}
286
287impl private::Sealed for Logs {}
288
289impl private::Sealed for Inspect {}
290
291impl DiagnosticsDataType for Logs {}
292
293impl DiagnosticsDataType for Inspect {}
294
295pub struct ArchiveReader<T> {
298 archive: Option<ArchiveAccessorProxy>,
299 selectors: Vec<SelectorArgument>,
300 retry_config: RetryConfig,
301 timeout: Option<MonotonicDuration>,
302 batch_retrieval_timeout_seconds: Option<i64>,
303 max_aggregated_content_size_bytes: Option<u64>,
304 format: Option<Format>,
305 _phantom: PhantomData<T>,
306}
307
308impl<T: DiagnosticsDataType> ArchiveReader<T> {
309 pub fn with_archive(&mut self, archive: ArchiveAccessorProxy) -> &mut Self {
313 self.archive = Some(archive);
314 self
315 }
316
317 pub fn with_minimum_schema_count(&mut self, minimum_schema_count: usize) -> &mut Self {
320 self.retry_config = RetryConfig::MinSchemaCount(minimum_schema_count);
321 self
322 }
323
324 pub fn retry(&mut self, config: RetryConfig) -> &mut Self {
326 self.retry_config = config;
327 self
328 }
329
330 pub fn with_timeout(&mut self, duration: MonotonicDuration) -> &mut Self {
333 self.timeout = Some(duration);
334 self
335 }
336
337 pub fn select_all_for_component(
341 &mut self,
342 component: impl ToComponentSelectorArguments,
343 ) -> &mut Self {
344 self.selectors.extend(component.to_component_selector_arguments().to_selector_arguments());
345 self
346 }
347
348 async fn snapshot_shared<D>(&self) -> Result<Vec<Data<D>>, Error>
350 where
351 D: DiagnosticsData + 'static,
352 {
353 let data_future = self.snapshot_inner::<D, Data<D>>(FORMAT);
354 let data = match self.timeout {
355 Some(timeout) => data_future.on_timeout(timeout.after_now(), || Ok(Vec::new())).await?,
356 None => data_future.await?,
357 };
358 Ok(data)
359 }
360
361 async fn snapshot_inner<D, Y>(&self, format: Format) -> Result<Vec<Y>, Error>
362 where
363 D: DiagnosticsData,
364 Y: for<'a> Deserialize<'a> + CheckResponse + Send + 'static,
365 {
366 loop {
367 let iterator = self.batch_iterator::<D>(StreamMode::Snapshot, format)?;
368 let result = drain_batch_iterator::<Y>(Arc::new(iterator))
369 .filter_map(|value| ready(value.ok()))
370 .collect::<Vec<_>>()
371 .await;
372
373 if self.retry_config.should_retry(result.len()) {
374 fasync::Timer::new(fasync::MonotonicInstant::after(
375 zx::MonotonicDuration::from_millis(RETRY_DELAY_MS),
376 ))
377 .await;
378 } else {
379 return Ok(result);
380 }
381 }
382 }
383
384 fn batch_iterator<D>(
385 &self,
386 mode: StreamMode,
387 format: Format,
388 ) -> Result<BatchIteratorProxy, Error>
389 where
390 D: DiagnosticsData,
391 {
392 let archive = match &self.archive {
393 Some(archive) => archive.clone(),
394 None => client::connect_to_protocol::<ArchiveAccessorMarker>()
395 .map_err(Error::ConnectToArchive)?,
396 };
397
398 let (iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
399
400 let stream_parameters = StreamParameters {
401 stream_mode: Some(mode),
402 data_type: Some(D::DATA_TYPE),
403 format: Some(format),
404 client_selector_configuration: if self.selectors.is_empty() {
405 Some(ClientSelectorConfiguration::SelectAll(true))
406 } else {
407 Some(ClientSelectorConfiguration::Selectors(self.selectors.to_vec()))
408 },
409 performance_configuration: Some(PerformanceConfiguration {
410 max_aggregate_content_size_bytes: self.max_aggregated_content_size_bytes,
411 batch_retrieval_timeout_seconds: self.batch_retrieval_timeout_seconds,
412 ..Default::default()
413 }),
414 ..Default::default()
415 };
416
417 archive
418 .stream_diagnostics(&stream_parameters, server_end)
419 .map_err(Error::StreamDiagnostics)?;
420 Ok(iterator)
421 }
422}
423
424impl ArchiveReader<Logs> {
425 pub fn logs() -> Self {
427 ArchiveReader::<Logs> {
428 timeout: None,
429 format: None,
430 selectors: vec![],
431 retry_config: RetryConfig::always(),
432 archive: None,
433 batch_retrieval_timeout_seconds: None,
434 max_aggregated_content_size_bytes: None,
435 _phantom: PhantomData,
436 }
437 }
438
439 #[doc(hidden)]
440 pub fn with_format(&mut self, format: Format) -> &mut Self {
441 self.format = Some(format);
442 self
443 }
444
445 #[inline]
446 fn format(&self) -> Format {
447 match self.format {
448 Some(f) => f,
449 None => {
450 #[cfg(fuchsia_api_level_at_least = "HEAD")]
451 let ret = Format::Fxt;
452 #[cfg(fuchsia_api_level_less_than = "HEAD")]
453 let ret = Format::Json;
454 ret
455 }
456 }
457 }
458
459 pub async fn snapshot(&self) -> Result<Vec<Data<Logs>>, Error> {
461 loop {
462 let iterator = self.batch_iterator::<Logs>(StreamMode::Snapshot, self.format())?;
463 let result = drain_batch_iterator_for_logs(Arc::new(iterator))
464 .filter_map(|value| ready(value.ok()))
465 .collect::<Vec<_>>()
466 .await;
467
468 if self.retry_config.should_retry(result.len()) {
469 fasync::Timer::new(fasync::MonotonicInstant::after(
470 zx::MonotonicDuration::from_millis(RETRY_DELAY_MS),
471 ))
472 .await;
473 } else {
474 return Ok(result);
475 }
476 }
477 }
478
479 pub fn snapshot_then_subscribe(&self) -> Result<Subscription, Error> {
482 let iterator =
483 self.batch_iterator::<Logs>(StreamMode::SnapshotThenSubscribe, self.format())?;
484 Ok(Subscription::new(iterator))
485 }
486}
487
488impl ArchiveReader<Inspect> {
489 pub fn inspect() -> Self {
491 ArchiveReader::<Inspect> {
492 timeout: None,
493 format: None,
494 selectors: vec![],
495 retry_config: RetryConfig::always(),
496 archive: None,
497 batch_retrieval_timeout_seconds: None,
498 max_aggregated_content_size_bytes: None,
499 _phantom: PhantomData,
500 }
501 }
502
503 pub fn with_batch_retrieval_timeout_seconds(&mut self, timeout: i64) -> &mut Self {
506 self.batch_retrieval_timeout_seconds = Some(timeout);
507 self
508 }
509
510 pub fn with_aggregated_result_bytes_limit(&mut self, limit_bytes: u64) -> &mut Self {
512 self.max_aggregated_content_size_bytes = Some(limit_bytes);
513 self
514 }
515
516 pub async fn snapshot_raw<T>(&self) -> Result<T, Error>
521 where
522 T: for<'a> Deserialize<'a>
523 + SerializableValue
524 + From<Vec<T>>
525 + CheckResponse
526 + 'static
527 + Send,
528 {
529 let data_future = self.snapshot_inner::<Inspect, T>(T::FORMAT_OF_VALUE);
530 let data = match self.timeout {
531 Some(timeout) => data_future.on_timeout(timeout.after_now(), || Ok(Vec::new())).await?,
532 None => data_future.await?,
533 };
534 Ok(T::from(data))
535 }
536
537 pub fn add_selectors<T, S>(&mut self, selectors: T) -> &mut Self
540 where
541 T: Iterator<Item = S>,
542 S: ToSelectorArguments,
543 {
544 for selector in selectors {
545 self.add_selector(selector);
546 }
547 self
548 }
549
550 pub fn add_selector(&mut self, selector: impl ToSelectorArguments) -> &mut Self {
552 self.selectors.extend(selector.to_selector_arguments());
553 self
554 }
555
556 pub async fn snapshot(&self) -> Result<Vec<Data<Inspect>>, Error> {
558 self.snapshot_shared::<Inspect>().await
559 }
560}
561
562#[derive(Debug, Deserialize)]
563#[serde(untagged)]
564enum OneOrMany<T> {
565 Many(Vec<T>),
566 One(T),
567}
568
569fn stream_batch<T>(
570 iterator: Arc<BatchIteratorProxy>,
571 process_content: impl Fn(FormattedContent) -> Result<OneOrMany<T>, Error>,
572) -> impl Stream<Item = Result<T, Error>>
573where
574 T: for<'a> Deserialize<'a> + Send + 'static,
575{
576 stream! {
577 loop {
578 let next_batch = iterator
579 .get_next()
580 .await
581 .map_err(Error::GetNextCall)?
582 .map_err(Error::GetNextReaderError)?;
583 if next_batch.is_empty() {
584 return;
586 }
587 for formatted_content in next_batch {
588 let output = process_content(formatted_content)?;
589 match output {
590 OneOrMany::One(data) => yield Ok(data),
591 OneOrMany::Many(datas) => {
592 for data in datas {
593 yield Ok(data);
594 }
595 }
596 }
597 }
598 }
599 }
600}
601
602pub fn drain_batch_iterator<T>(
604 iterator: Arc<BatchIteratorProxy>,
605) -> impl Stream<Item = Result<T, Error>>
606where
607 T: for<'a> Deserialize<'a> + Send + 'static,
608{
609 stream_batch(iterator, |formatted_content| match formatted_content {
610 FormattedContent::Json(data) => {
611 let mut buf = vec![0; data.size as usize];
612 data.vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
613 serde_json::from_slice(&buf).map_err(Error::ReadJson)
614 }
615 #[cfg(fuchsia_api_level_at_least = "HEAD")]
616 FormattedContent::Cbor(vmo) => {
617 let mut buf = vec![0; vmo.get_content_size().expect("Always returns Ok") as usize];
618 vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
619 Ok(ciborium::from_reader(buf.as_slice()).map_err(|err| Error::ReadCbor(err.into()))?)
620 }
621 #[cfg(fuchsia_api_level_at_least = "PLATFORM")]
622 FormattedContent::Text(_) => unreachable!("We never expect Text"),
623 #[cfg(fuchsia_api_level_at_least = "HEAD")]
624 FormattedContent::Fxt(_) => unreachable!("We never expect FXT for Inspect"),
625 FormattedContent::__SourceBreaking { unknown_ordinal: _ } => {
626 unreachable!("Received unrecognized FIDL message")
627 }
628 })
629}
630
631fn drain_batch_iterator_for_logs(
632 iterator: Arc<BatchIteratorProxy>,
633) -> impl Stream<Item = Result<LogsData, Error>> {
634 stream_batch::<LogsData>(iterator, |formatted_content| match formatted_content {
635 FormattedContent::Json(data) => {
636 let mut buf = vec![0; data.size as usize];
637 data.vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
638 serde_json::from_slice(&buf).map_err(Error::ReadJson)
639 }
640 #[cfg(fuchsia_api_level_at_least = "HEAD")]
641 FormattedContent::Fxt(vmo) => {
642 let mut buf = vec![0; vmo.get_content_size().expect("Always returns Ok") as usize];
643 vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
644 let mut current_slice: &[u8] = buf.as_ref();
645 let mut items = vec![];
646 while !current_slice.is_empty() {
647 match from_extended_record(current_slice) {
648 Ok((data, remaining)) => {
649 items.push(data);
650 current_slice = remaining;
651 }
652 Err(_) => {
653 break;
656 }
657 }
658 }
659 Ok(OneOrMany::Many(items))
660 }
661 #[cfg(fuchsia_api_level_at_least = "PLATFORM")]
662 FormattedContent::Text(_) => unreachable!("We never expect Text"),
663 #[cfg(fuchsia_api_level_at_least = "HEAD")]
664 FormattedContent::Cbor(_) => unreachable!("We never expect CBOR"),
665 FormattedContent::__SourceBreaking { unknown_ordinal: _ } => {
666 unreachable!("Received unrecognized FIDL message")
667 }
668 })
669}
670
671#[pin_project]
673pub struct Subscription {
674 #[pin]
675 recv: Pin<Box<dyn FusedStream<Item = Result<LogsData, Error>> + Send>>,
676 iterator: Arc<BatchIteratorProxy>,
677}
678
679const DATA_CHANNEL_SIZE: usize = 32;
680const ERROR_CHANNEL_SIZE: usize = 2;
681
682impl Subscription {
683 pub fn new(iterator: BatchIteratorProxy) -> Self {
686 let iterator = Arc::new(iterator);
687 Subscription {
688 recv: Box::pin(drain_batch_iterator_for_logs(iterator.clone()).fuse()),
689 iterator,
690 }
691 }
692
693 pub async fn wait_for_ready(&self) {
695 self.iterator.wait_for_ready().await.expect("doesn't disconnect");
696 }
697
698 pub fn split_streams(mut self) -> (SubscriptionResultsStream<LogsData>, mpsc::Receiver<Error>) {
700 let (mut errors_sender, errors) = mpsc::channel(ERROR_CHANNEL_SIZE);
701 let (mut results_sender, recv) = mpsc::channel(DATA_CHANNEL_SIZE);
702 let _drain_task = fasync::Task::spawn(async move {
703 while let Some(result) = self.next().await {
704 match result {
705 Ok(value) => results_sender.send(value).await.ok(),
706 Err(e) => errors_sender.send(e).await.ok(),
707 };
708 }
709 });
710 (SubscriptionResultsStream { recv, _drain_task }, errors)
711 }
712}
713
714impl Stream for Subscription {
715 type Item = Result<LogsData, Error>;
716
717 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
718 let this = self.project();
719 this.recv.poll_next(cx)
720 }
721}
722
723impl FusedStream for Subscription {
724 fn is_terminated(&self) -> bool {
725 self.recv.is_terminated()
726 }
727}
728
729#[pin_project]
731pub struct SubscriptionResultsStream<T> {
732 #[pin]
733 recv: mpsc::Receiver<T>,
734 _drain_task: fasync::Task<()>,
735}
736
737impl<T> Stream for SubscriptionResultsStream<T>
738where
739 T: for<'a> Deserialize<'a>,
740{
741 type Item = T;
742
743 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
744 let this = self.project();
745 this.recv.poll_next(cx)
746 }
747}
748
749impl<T> FusedStream for SubscriptionResultsStream<T>
750where
751 T: for<'a> Deserialize<'a>,
752{
753 fn is_terminated(&self) -> bool {
754 self.recv.is_terminated()
755 }
756}
757
758#[cfg(test)]
759mod tests {
760 use super::*;
761 use assert_matches::assert_matches;
762 use diagnostics_assertions::assert_data_tree;
763 use diagnostics_log::{Publisher, PublisherOptions};
764 use fidl::endpoints::ServerEnd;
765 use fidl_fuchsia_diagnostics as fdiagnostics;
766 use fuchsia_component_test::{
767 Capability, ChildOptions, RealmBuilder, RealmInstance, Ref, Route,
768 };
769 use futures::TryStreamExt;
770 use log::{error, info};
771
772 const TEST_COMPONENT_URL: &str = "#meta/inspect_test_component.cm";
773
774 struct ComponentOptions {
775 publish_n_trees: u64,
776 }
777
778 async fn start_component(opts: ComponentOptions) -> Result<RealmInstance, anyhow::Error> {
779 let builder = RealmBuilder::new().await?;
780 let test_component = builder
781 .add_child("test_component", TEST_COMPONENT_URL, ChildOptions::new().eager())
782 .await?;
783 builder
784 .add_route(
785 Route::new()
786 .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
787 .from(Ref::parent())
788 .to(&test_component),
789 )
790 .await?;
791 builder.init_mutable_config_to_empty(&test_component).await.unwrap();
792 builder
793 .set_config_value(&test_component, "publish_n_trees", opts.publish_n_trees.into())
794 .await
795 .unwrap();
796 let instance = builder.build().await?;
797 Ok(instance)
798 }
799
800 #[fuchsia::test]
803 async fn inspect_data_for_component() -> Result<(), anyhow::Error> {
804 let instance = start_component(ComponentOptions { publish_n_trees: 1 }).await?;
805 let moniker = format!("realm_builder:{}/test_component", instance.root.child_name());
806 let component_selector = selectors::sanitize_moniker_for_selectors(&moniker);
807 let results = ArchiveReader::inspect()
808 .add_selector(format!("{component_selector}:[...]root"))
809 .snapshot()
810 .await?;
811 assert_eq!(results.len(), 1);
812 assert_data_tree!(results[0].payload.as_ref().unwrap(), root: {
813 "tree-0": 0u64,
814 int: 3u64,
815 "lazy-node": {
816 a: "test",
817 child: {
818 double: 3.25,
819 },
820 }
821 });
822 let lazy_property_selector = Selector {
824 component_selector: Some(fdiagnostics::ComponentSelector {
825 moniker_segments: Some(vec![
826 fdiagnostics::StringSelector::ExactMatch(format!(
827 "realm_builder:{}",
828 instance.root.child_name()
829 )),
830 fdiagnostics::StringSelector::ExactMatch("test_component".into()),
831 ]),
832 ..Default::default()
833 }),
834 tree_selector: Some(fdiagnostics::TreeSelector::PropertySelector(
835 fdiagnostics::PropertySelector {
836 node_path: vec![
837 fdiagnostics::StringSelector::ExactMatch("root".into()),
838 fdiagnostics::StringSelector::ExactMatch("lazy-node".into()),
839 ],
840 target_properties: fdiagnostics::StringSelector::ExactMatch("a".into()),
841 },
842 )),
843 tree_names: Some(fdiagnostics::TreeNames::All(fdiagnostics::All {})),
844 ..Default::default()
845 };
846 let int_property_selector = format!("{component_selector}:[...]root:int");
847 let mut reader = ArchiveReader::inspect();
848 reader.add_selector(int_property_selector).add_selector(lazy_property_selector);
849 let response = reader.snapshot().await?;
850 assert_eq!(response.len(), 1);
851 assert_eq!(response[0].moniker.to_string(), moniker);
852 assert_data_tree!(response[0].payload.as_ref().unwrap(), root: {
853 int: 3u64,
854 "lazy-node": {
855 a: "test"
856 }
857 });
858 Ok(())
859 }
860
861 #[fuchsia::test]
862 async fn select_all_for_moniker() {
863 let instance = start_component(ComponentOptions { publish_n_trees: 1 })
864 .await
865 .expect("component started");
866 let moniker = format!("realm_builder:{}/test_component", instance.root.child_name());
867 let results = ArchiveReader::inspect()
868 .select_all_for_component(moniker)
869 .snapshot()
870 .await
871 .expect("snapshotted");
872 assert_eq!(results.len(), 1);
873 assert_data_tree!(results[0].payload.as_ref().unwrap(), root: {
874 "tree-0": 0u64,
875 int: 3u64,
876 "lazy-node": {
877 a: "test",
878 child: {
879 double: 3.25,
880 },
881 }
882 });
883 }
884
885 #[fuchsia::test]
886 async fn timeout() -> Result<(), anyhow::Error> {
887 let instance = start_component(ComponentOptions { publish_n_trees: 1 }).await?;
888
889 let mut reader = ArchiveReader::inspect();
890 reader
891 .add_selector(format!(
892 "realm_builder\\:{}/test_component:root",
893 instance.root.child_name()
894 ))
895 .with_timeout(zx::MonotonicDuration::from_nanos(0));
896 let result = reader.snapshot().await;
897 assert!(result.unwrap().is_empty());
898 Ok(())
899 }
900
901 #[fuchsia::test]
902 async fn component_selector() {
903 let selector = ComponentSelector::new(vec!["a".to_string()]);
904 assert_eq!(selector.moniker_str(), "a");
905 let arguments: Vec<_> = selector.to_selector_arguments().collect();
906 assert_eq!(arguments, vec![SelectorArgument::RawSelector("a:root".to_string())]);
907
908 let selector =
909 ComponentSelector::new(vec!["b".to_string(), "c".to_string(), "a".to_string()]);
910 assert_eq!(selector.moniker_str(), "b/c/a");
911
912 let selector = selector.with_tree_selector("root/b/c:d").with_tree_selector("root/e:f");
913 let arguments: Vec<_> = selector.to_selector_arguments().collect();
914 assert_eq!(
915 arguments,
916 vec![
917 SelectorArgument::RawSelector("b/c/a:root/b/c:d".into()),
918 SelectorArgument::RawSelector("b/c/a:root/e:f".into()),
919 ]
920 );
921 }
922
923 #[fuchsia::test]
924 async fn custom_archive() {
925 let proxy = spawn_fake_archive(serde_json::json!({
926 "moniker": "moniker",
927 "version": 1,
928 "data_source": "Inspect",
929 "metadata": {
930 "component_url": "component-url",
931 "timestamp": 0,
932 "filename": "filename",
933 },
934 "payload": {
935 "root": {
936 "x": 1,
937 }
938 }
939 }));
940 let result =
941 ArchiveReader::inspect().with_archive(proxy).snapshot().await.expect("got result");
942 assert_eq!(result.len(), 1);
943 assert_data_tree!(result[0].payload.as_ref().unwrap(), root: { x: 1u64 });
944 }
945
946 #[fuchsia::test]
947 async fn handles_lists_correctly_on_snapshot_raw() {
948 let value = serde_json::json!({
949 "moniker": "moniker",
950 "version": 1,
951 "data_source": "Inspect",
952 "metadata": {
953 "component_url": "component-url",
954 "timestamp": 0,
955 "filename": "filename",
956 },
957 "payload": {
958 "root": {
959 "x": 1,
960 }
961 }
962 });
963 let proxy = spawn_fake_archive(serde_json::json!([value.clone()]));
964 let mut reader = ArchiveReader::inspect();
965 reader.with_archive(proxy);
966 let json_result = reader.snapshot_raw::<serde_json::Value>().await.expect("got result");
967 match json_result {
968 serde_json::Value::Array(values) => {
969 assert_eq!(values.len(), 1);
970 assert_eq!(values[0], value);
971 }
972 result => panic!("unexpected result: {result:?}"),
973 }
974 let cbor_result = reader.snapshot_raw::<ciborium::Value>().await.expect("got result");
975 match cbor_result {
976 ciborium::Value::Array(values) => {
977 assert_eq!(values.len(), 1);
978 let json_result =
979 values[0].deserialized::<serde_json::Value>().expect("convert to json");
980 assert_eq!(json_result, value);
981 }
982 result => panic!("unexpected result: {result:?}"),
983 }
984 }
985
986 #[fuchsia::test(logging = false)]
987 async fn snapshot_then_subscribe() {
988 let (_instance, publisher, reader) = init_isolated_logging().await;
989 let (mut stream, _errors) =
990 reader.snapshot_then_subscribe().expect("subscribed to logs").split_streams();
991 publisher.register_logger(None).unwrap();
992 info!("hello from test");
993 error!("error from test");
994 let log = stream.next().await.unwrap();
995 assert_eq!(log.msg().unwrap(), "hello from test");
996 let log = stream.next().await.unwrap();
997 assert_eq!(log.msg().unwrap(), "error from test");
998 }
999
1000 #[fuchsia::test]
1001 async fn read_many_trees_with_filtering() {
1002 let instance = start_component(ComponentOptions { publish_n_trees: 2 })
1003 .await
1004 .expect("component started");
1005 let selector = format!(
1006 "realm_builder\\:{}/test_component:[name=tree-0]root",
1007 instance.root.child_name()
1008 );
1009 let results = ArchiveReader::inspect()
1010 .add_selector(selector)
1011 .with_minimum_schema_count(1)
1013 .snapshot()
1014 .await
1015 .expect("snapshotted");
1016 assert_matches!(results.iter().find(|v| v.metadata.name.as_ref() == "tree-1"), None);
1017 let should_have_data =
1018 results.into_iter().find(|v| v.metadata.name.as_ref() == "tree-0").unwrap();
1019 assert_data_tree!(should_have_data.payload.unwrap(), root: contains {
1020 "tree-0": 0u64,
1021 });
1022 }
1023
1024 fn spawn_fake_archive(data_to_send: serde_json::Value) -> fdiagnostics::ArchiveAccessorProxy {
1025 let (proxy, mut stream) =
1026 fidl::endpoints::create_proxy_and_stream::<fdiagnostics::ArchiveAccessorMarker>();
1027 fasync::Task::spawn(async move {
1028 while let Some(request) = stream.try_next().await.expect("stream request") {
1029 match request {
1030 fdiagnostics::ArchiveAccessorRequest::StreamDiagnostics {
1031 result_stream,
1032 ..
1033 } => {
1034 let data = data_to_send.clone();
1035 fasync::Task::spawn(handle_batch_iterator(data, result_stream)).detach();
1036 }
1037 fdiagnostics::ArchiveAccessorRequest::WaitForReady { responder, .. } => {
1038 let _ = responder.send();
1039 }
1040 fdiagnostics::ArchiveAccessorRequest::_UnknownMethod { .. } => {
1041 unreachable!("Unexpected method call");
1042 }
1043 }
1044 }
1045 })
1046 .detach();
1047 proxy
1048 }
1049
1050 async fn handle_batch_iterator(
1051 data: serde_json::Value,
1052 result_stream: ServerEnd<fdiagnostics::BatchIteratorMarker>,
1053 ) {
1054 let mut called = false;
1055 let mut stream = result_stream.into_stream();
1056 while let Some(req) = stream.try_next().await.expect("stream request") {
1057 match req {
1058 fdiagnostics::BatchIteratorRequest::WaitForReady { responder } => {
1059 let _ = responder.send();
1060 }
1061 fdiagnostics::BatchIteratorRequest::GetNext { responder } => {
1062 if called {
1063 responder.send(Ok(Vec::new())).expect("send response");
1064 continue;
1065 }
1066 called = true;
1067 let content = serde_json::to_string_pretty(&data).expect("json pretty");
1068 let vmo_size = content.len() as u64;
1069 let vmo = zx::Vmo::create(vmo_size).expect("create vmo");
1070 vmo.write(content.as_bytes(), 0).expect("write vmo");
1071 let buffer = fidl_fuchsia_mem::Buffer { vmo, size: vmo_size };
1072 responder
1073 .send(Ok(vec![fdiagnostics::FormattedContent::Json(buffer)]))
1074 .expect("send response");
1075 }
1076 fdiagnostics::BatchIteratorRequest::_UnknownMethod { .. } => {
1077 unreachable!("Unexpected method call");
1078 }
1079 }
1080 }
1081 }
1082
1083 async fn create_realm() -> RealmBuilder {
1084 let builder = RealmBuilder::new().await.expect("create realm builder");
1085 let archivist = builder
1086 .add_child("archivist", "#meta/archivist-for-embedding.cm", ChildOptions::new().eager())
1087 .await
1088 .expect("add child archivist");
1089 builder
1090 .add_route(
1091 Route::new()
1092 .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
1093 .capability(
1094 Capability::protocol_by_name("fuchsia.tracing.provider.Registry")
1095 .optional(),
1096 )
1097 .capability(Capability::event_stream("stopped"))
1098 .capability(Capability::event_stream("capability_requested"))
1099 .from(Ref::parent())
1100 .to(&archivist),
1101 )
1102 .await
1103 .expect("added routes from parent to archivist");
1104 builder
1105 .add_route(
1106 Route::new()
1107 .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
1108 .from(&archivist)
1109 .to(Ref::parent()),
1110 )
1111 .await
1112 .expect("routed LogSink from archivist to parent");
1113 builder
1114 .add_route(
1115 Route::new()
1116 .capability(Capability::protocol_by_name("fuchsia.diagnostics.ArchiveAccessor"))
1117 .from(Ref::dictionary(&archivist, "diagnostics-accessors"))
1118 .to(Ref::parent()),
1119 )
1120 .await
1121 .expect("routed ArchiveAccessor from archivist to parent");
1122 builder
1123 }
1124
1125 async fn init_isolated_logging() -> (RealmInstance, Publisher, ArchiveReader<Logs>) {
1126 let instance = create_realm().await.build().await.unwrap();
1127 let log_sink_client = instance.root.connect_to_protocol_at_exposed_dir().unwrap();
1128 let accessor_proxy = instance.root.connect_to_protocol_at_exposed_dir().unwrap();
1129 let mut reader = ArchiveReader::logs();
1130 reader.with_archive(accessor_proxy);
1131 let options = PublisherOptions::default().use_log_sink(log_sink_client);
1132 let publisher = Publisher::new_async(options).await.unwrap();
1133 (instance, publisher, reader)
1134 }
1135
1136 #[fuchsia::test]
1137 fn retry_config_behavior() {
1138 let config = RetryConfig::MinSchemaCount(1);
1139 let got = 0;
1140
1141 assert!(config.should_retry(got));
1142
1143 let config = RetryConfig::MinSchemaCount(1);
1144 let got = 1;
1145
1146 assert!(!config.should_retry(got));
1147
1148 let config = RetryConfig::MinSchemaCount(1);
1149 let got = 2;
1150
1151 assert!(!config.should_retry(got));
1152
1153 let config = RetryConfig::MinSchemaCount(0);
1154 let got = 1;
1155
1156 assert!(!config.should_retry(got));
1157
1158 let config = RetryConfig::always();
1159 let got = 0;
1160
1161 assert!(config.should_retry(got));
1162
1163 let config = RetryConfig::never();
1164 let got = 0;
1165
1166 assert!(!config.should_retry(got));
1167 }
1168}