1#![deny(missing_docs)]
6
7use async_stream::stream;
11use diagnostics_data::{DiagnosticsData, LogsData};
12#[cfg(fuchsia_api_level_less_than = "HEAD")]
13use diagnostics_message as _;
14#[cfg(fuchsia_api_level_at_least = "HEAD")]
15use diagnostics_message::from_extended_record;
16use fidl_fuchsia_diagnostics::{
17 ArchiveAccessorMarker, ArchiveAccessorProxy, BatchIteratorMarker, BatchIteratorProxy,
18 ClientSelectorConfiguration, Format, FormattedContent, PerformanceConfiguration, ReaderError,
19 Selector, SelectorArgument, StreamMode, StreamParameters,
20};
21use fuchsia_async::{self as fasync, DurationExt, TimeoutExt};
22use fuchsia_component::client;
23use futures::channel::mpsc;
24use futures::prelude::*;
25use futures::sink::SinkExt;
26use futures::stream::FusedStream;
27use pin_project::pin_project;
28use serde::Deserialize;
29use std::future::ready;
30use std::marker::PhantomData;
31use std::pin::Pin;
32use std::sync::Arc;
33use std::task::{Context, Poll};
34use thiserror::Error;
35use zx::{self as zx, MonotonicDuration};
36
37pub type LogsArchiveReader = ArchiveReader<Logs>;
39
40pub type InspectArchiveReader = ArchiveReader<Inspect>;
42
43pub use diagnostics_data::{Data, Inspect, Logs, Severity};
44pub use diagnostics_hierarchy::{hierarchy, DiagnosticsHierarchy, Property};
45
46const RETRY_DELAY_MS: i64 = 300;
47
48#[cfg(fuchsia_api_level_at_least = "HEAD")]
49const FORMAT: Format = Format::Cbor;
50#[cfg(fuchsia_api_level_less_than = "HEAD")]
51const FORMAT: Format = Format::Json;
52
53#[derive(Debug, Error)]
55pub enum Error {
56 #[error("Failed to connect to the archive accessor")]
58 ConnectToArchive(#[source] anyhow::Error),
59
60 #[error("Failed to create the BatchIterator channel ends")]
62 CreateIteratorProxy(#[source] fidl::Error),
63
64 #[error("Failed to stream diagnostics from the accessor")]
66 StreamDiagnostics(#[source] fidl::Error),
67
68 #[error("Failed to call iterator server")]
70 GetNextCall(#[source] fidl::Error),
71
72 #[error("Received error from the GetNext response: {0:?}")]
74 GetNextReaderError(ReaderError),
75
76 #[error("Failed to read json received")]
78 ReadJson(#[source] serde_json::Error),
79
80 #[cfg(fuchsia_api_level_at_least = "HEAD")]
82 #[error("Failed to read cbor received")]
83 ReadCbor(#[source] anyhow::Error),
84
85 #[error("Failed to parse the diagnostics data from the json received")]
87 ParseDiagnosticsData(#[source] serde_json::Error),
88
89 #[error("Failed to read vmo from the response")]
91 ReadVmo(#[source] zx::Status),
92}
93
94pub struct ComponentSelector {
96 moniker: Vec<String>,
97 tree_selectors: Vec<String>,
98}
99
100impl ComponentSelector {
101 pub fn new(moniker: Vec<String>) -> Self {
106 Self { moniker, tree_selectors: Vec::new() }
107 }
108
109 pub fn with_tree_selector(mut self, tree_selector: impl Into<String>) -> Self {
111 self.tree_selectors.push(tree_selector.into());
112 self
113 }
114
115 fn moniker_str(&self) -> String {
116 self.moniker.join("/")
117 }
118}
119
120pub trait ToSelectorArguments {
122 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>>;
124}
125
126pub trait ToComponentSelectorArguments {
128 fn to_component_selector_arguments(self) -> ComponentSelector;
130}
131
132impl ToComponentSelectorArguments for &str {
133 fn to_component_selector_arguments(self) -> ComponentSelector {
134 if self.contains("\\:") {
135 ComponentSelector::new(self.split("/").map(|value| value.to_string()).collect())
137 } else {
138 ComponentSelector::new(
140 selectors::sanitize_moniker_for_selectors(self)
141 .split("/")
142 .map(|value| value.to_string())
143 .collect(),
144 )
145 .with_tree_selector("[...]root")
146 }
147 }
148}
149
150impl ToComponentSelectorArguments for String {
151 fn to_component_selector_arguments(self) -> ComponentSelector {
152 self.as_str().to_component_selector_arguments()
153 }
154}
155
156impl ToComponentSelectorArguments for ComponentSelector {
157 fn to_component_selector_arguments(self) -> ComponentSelector {
158 self
159 }
160}
161
162impl ToSelectorArguments for String {
163 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
164 Box::new([SelectorArgument::RawSelector(self)].into_iter())
165 }
166}
167
168impl ToSelectorArguments for &str {
169 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
170 Box::new([SelectorArgument::RawSelector(self.to_string())].into_iter())
171 }
172}
173
174impl ToSelectorArguments for ComponentSelector {
175 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
176 let moniker = self.moniker_str();
177 if self.tree_selectors.is_empty() {
179 Box::new([SelectorArgument::RawSelector(format!("{}:root", moniker))].into_iter())
180 } else {
181 Box::new(
182 self.tree_selectors
183 .into_iter()
184 .map(move |s| SelectorArgument::RawSelector(format!("{moniker}:{s}"))),
185 )
186 }
187 }
188}
189
190impl ToSelectorArguments for Selector {
191 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
192 Box::new([SelectorArgument::StructuredSelector(self)].into_iter())
193 }
194}
195
196pub trait SerializableValue: private::Sealed {
198 const FORMAT_OF_VALUE: Format;
200}
201
202pub trait CheckResponse: private::Sealed {
204 fn has_payload(&self) -> bool;
206}
207
208mod private {
212 pub trait Sealed {}
213}
214impl private::Sealed for serde_json::Value {}
215impl private::Sealed for ciborium::Value {}
216impl<D: DiagnosticsData> private::Sealed for Data<D> {}
217
218impl<D: DiagnosticsData> CheckResponse for Data<D> {
219 fn has_payload(&self) -> bool {
220 self.payload.is_some()
221 }
222}
223
224impl SerializableValue for serde_json::Value {
225 const FORMAT_OF_VALUE: Format = Format::Json;
226}
227
228impl CheckResponse for serde_json::Value {
229 fn has_payload(&self) -> bool {
230 match self {
231 serde_json::Value::Object(obj) => {
232 obj.get("payload").map(|p| !matches!(p, serde_json::Value::Null)).is_some()
233 }
234 _ => false,
235 }
236 }
237}
238
239#[cfg(fuchsia_api_level_at_least = "HEAD")]
240impl SerializableValue for ciborium::Value {
241 const FORMAT_OF_VALUE: Format = Format::Cbor;
242}
243
244impl CheckResponse for ciborium::Value {
245 fn has_payload(&self) -> bool {
246 match self {
247 ciborium::Value::Map(m) => {
248 let payload_key = ciborium::Value::Text("payload".into());
249 m.iter().any(|(key, _)| *key == payload_key)
250 }
251 _ => false,
252 }
253 }
254}
255
256#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
258pub enum RetryConfig {
259 MinSchemaCount(usize),
263}
264
265impl RetryConfig {
266 pub fn always() -> Self {
268 Self::MinSchemaCount(1)
269 }
270
271 pub fn never() -> Self {
273 Self::MinSchemaCount(0)
274 }
275
276 fn should_retry(&self, result_count: usize) -> bool {
278 match self {
279 Self::MinSchemaCount(min) => *min > result_count,
280 }
281 }
282}
283
284pub trait DiagnosticsDataType: private::Sealed {}
286
287impl private::Sealed for Logs {}
288
289impl private::Sealed for Inspect {}
290
291impl DiagnosticsDataType for Logs {}
292
293impl DiagnosticsDataType for Inspect {}
294
295pub struct ArchiveReader<T> {
298 archive: Option<ArchiveAccessorProxy>,
299 selectors: Vec<SelectorArgument>,
300 retry_config: RetryConfig,
301 timeout: Option<MonotonicDuration>,
302 batch_retrieval_timeout_seconds: Option<i64>,
303 max_aggregated_content_size_bytes: Option<u64>,
304 format: Option<Format>,
305 _phantom: PhantomData<T>,
306}
307
308impl<T: DiagnosticsDataType> ArchiveReader<T> {
309 pub fn with_archive(&mut self, archive: ArchiveAccessorProxy) -> &mut Self {
313 self.archive = Some(archive);
314 self
315 }
316
317 pub fn with_minimum_schema_count(&mut self, minimum_schema_count: usize) -> &mut Self {
320 self.retry_config = RetryConfig::MinSchemaCount(minimum_schema_count);
321 self
322 }
323
324 pub fn retry(&mut self, config: RetryConfig) -> &mut Self {
326 self.retry_config = config;
327 self
328 }
329
330 pub fn with_timeout(&mut self, duration: MonotonicDuration) -> &mut Self {
333 self.timeout = Some(duration);
334 self
335 }
336
337 pub fn select_all_for_component(
341 &mut self,
342 component: impl ToComponentSelectorArguments,
343 ) -> &mut Self {
344 self.selectors.extend(component.to_component_selector_arguments().to_selector_arguments());
345 self
346 }
347
348 async fn snapshot_shared<D>(&self) -> Result<Vec<Data<D>>, Error>
350 where
351 D: DiagnosticsData + 'static,
352 {
353 let data_future = self.snapshot_inner::<D, Data<D>>(FORMAT);
354 let data = match self.timeout {
355 Some(timeout) => data_future.on_timeout(timeout.after_now(), || Ok(Vec::new())).await?,
356 None => data_future.await?,
357 };
358 Ok(data)
359 }
360
361 async fn snapshot_inner<D, Y>(&self, format: Format) -> Result<Vec<Y>, Error>
362 where
363 D: DiagnosticsData,
364 Y: for<'a> Deserialize<'a> + CheckResponse + Send + 'static,
365 {
366 loop {
367 let iterator = self.batch_iterator::<D>(StreamMode::Snapshot, format)?;
368 let result = drain_batch_iterator::<Y>(Arc::new(iterator))
369 .filter_map(|value| ready(value.ok()))
370 .collect::<Vec<_>>()
371 .await;
372
373 if self.retry_config.should_retry(result.len()) {
374 fasync::Timer::new(fasync::MonotonicInstant::after(
375 zx::MonotonicDuration::from_millis(RETRY_DELAY_MS),
376 ))
377 .await;
378 } else {
379 return Ok(result);
380 }
381 }
382 }
383
384 fn batch_iterator<D>(
385 &self,
386 mode: StreamMode,
387 format: Format,
388 ) -> Result<BatchIteratorProxy, Error>
389 where
390 D: DiagnosticsData,
391 {
392 let archive = match &self.archive {
393 Some(archive) => archive.clone(),
394 None => client::connect_to_protocol::<ArchiveAccessorMarker>()
395 .map_err(Error::ConnectToArchive)?,
396 };
397
398 let (iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
399
400 let stream_parameters = StreamParameters {
401 stream_mode: Some(mode),
402 data_type: Some(D::DATA_TYPE),
403 format: Some(format),
404 client_selector_configuration: if self.selectors.is_empty() {
405 Some(ClientSelectorConfiguration::SelectAll(true))
406 } else {
407 Some(ClientSelectorConfiguration::Selectors(self.selectors.to_vec()))
408 },
409 performance_configuration: Some(PerformanceConfiguration {
410 max_aggregate_content_size_bytes: self.max_aggregated_content_size_bytes,
411 batch_retrieval_timeout_seconds: self.batch_retrieval_timeout_seconds,
412 ..Default::default()
413 }),
414 ..Default::default()
415 };
416
417 archive
418 .stream_diagnostics(&stream_parameters, server_end)
419 .map_err(Error::StreamDiagnostics)?;
420 Ok(iterator)
421 }
422}
423
424impl ArchiveReader<Logs> {
425 pub fn logs() -> Self {
427 ArchiveReader::<Logs> {
428 timeout: None,
429 format: None,
430 selectors: vec![],
431 retry_config: RetryConfig::always(),
432 archive: None,
433 batch_retrieval_timeout_seconds: None,
434 max_aggregated_content_size_bytes: None,
435 _phantom: PhantomData,
436 }
437 }
438
439 #[doc(hidden)]
440 pub fn with_format(&mut self, format: Format) -> &mut Self {
441 self.format = Some(format);
442 self
443 }
444
445 #[inline]
446 fn format(&self) -> Format {
447 match self.format {
448 Some(f) => f,
449 None => {
450 #[cfg(fuchsia_api_level_at_least = "HEAD")]
451 let ret = Format::Fxt;
452 #[cfg(fuchsia_api_level_less_than = "HEAD")]
453 let ret = Format::Json;
454 ret
455 }
456 }
457 }
458
459 pub async fn snapshot(&self) -> Result<Vec<Data<Logs>>, Error> {
461 loop {
462 let iterator = self.batch_iterator::<Logs>(StreamMode::Snapshot, self.format())?;
463 let result = drain_batch_iterator_for_logs(Arc::new(iterator))
464 .filter_map(|value| ready(value.ok()))
465 .collect::<Vec<_>>()
466 .await;
467
468 if self.retry_config.should_retry(result.len()) {
469 fasync::Timer::new(fasync::MonotonicInstant::after(
470 zx::MonotonicDuration::from_millis(RETRY_DELAY_MS),
471 ))
472 .await;
473 } else {
474 return Ok(result);
475 }
476 }
477 }
478
479 pub fn snapshot_then_subscribe(&self) -> Result<Subscription, Error> {
482 let iterator =
483 self.batch_iterator::<Logs>(StreamMode::SnapshotThenSubscribe, self.format())?;
484 Ok(Subscription::new(iterator))
485 }
486}
487
488impl ArchiveReader<Inspect> {
489 pub fn inspect() -> Self {
491 ArchiveReader::<Inspect> {
492 timeout: None,
493 format: None,
494 selectors: vec![],
495 retry_config: RetryConfig::always(),
496 archive: None,
497 batch_retrieval_timeout_seconds: None,
498 max_aggregated_content_size_bytes: None,
499 _phantom: PhantomData,
500 }
501 }
502
503 pub fn with_batch_retrieval_timeout_seconds(&mut self, timeout: i64) -> &mut Self {
506 self.batch_retrieval_timeout_seconds = Some(timeout);
507 self
508 }
509
510 pub fn with_aggregated_result_bytes_limit(&mut self, limit_bytes: u64) -> &mut Self {
512 self.max_aggregated_content_size_bytes = Some(limit_bytes);
513 self
514 }
515
516 pub async fn snapshot_raw<T>(&self) -> Result<T, Error>
521 where
522 T: for<'a> Deserialize<'a>
523 + SerializableValue
524 + From<Vec<T>>
525 + CheckResponse
526 + 'static
527 + Send,
528 {
529 let data_future = self.snapshot_inner::<Inspect, T>(T::FORMAT_OF_VALUE);
530 let data = match self.timeout {
531 Some(timeout) => data_future.on_timeout(timeout.after_now(), || Ok(Vec::new())).await?,
532 None => data_future.await?,
533 };
534 Ok(T::from(data))
535 }
536
537 pub fn add_selectors<T, S>(&mut self, selectors: T) -> &mut Self
540 where
541 T: Iterator<Item = S>,
542 S: ToSelectorArguments,
543 {
544 for selector in selectors {
545 self.add_selector(selector);
546 }
547 self
548 }
549
550 pub fn add_selector(&mut self, selector: impl ToSelectorArguments) -> &mut Self {
552 self.selectors.extend(selector.to_selector_arguments());
553 self
554 }
555
556 pub async fn snapshot(&self) -> Result<Vec<Data<Inspect>>, Error> {
558 self.snapshot_shared::<Inspect>().await
559 }
560}
561
562#[derive(Debug, Deserialize)]
563#[serde(untagged)]
564enum OneOrMany<T> {
565 Many(Vec<T>),
566 One(T),
567}
568
569fn stream_batch<T>(
570 iterator: Arc<BatchIteratorProxy>,
571 process_content: impl Fn(FormattedContent) -> Result<OneOrMany<T>, Error>,
572) -> impl Stream<Item = Result<T, Error>>
573where
574 T: for<'a> Deserialize<'a> + Send + 'static,
575{
576 stream! {
577 loop {
578 let next_batch = iterator
579 .get_next()
580 .await
581 .map_err(Error::GetNextCall)?
582 .map_err(Error::GetNextReaderError)?;
583 if next_batch.is_empty() {
584 return;
586 }
587 for formatted_content in next_batch {
588 let output = process_content(formatted_content)?;
589 match output {
590 OneOrMany::One(data) => yield Ok(data),
591 OneOrMany::Many(datas) => {
592 for data in datas {
593 yield Ok(data);
594 }
595 }
596 }
597 }
598 }
599 }
600}
601
602fn drain_batch_iterator<T>(
603 iterator: Arc<BatchIteratorProxy>,
604) -> impl Stream<Item = Result<T, Error>>
605where
606 T: for<'a> Deserialize<'a> + Send + 'static,
607{
608 stream_batch(iterator, |formatted_content| match formatted_content {
609 FormattedContent::Json(data) => {
610 let mut buf = vec![0; data.size as usize];
611 data.vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
612 serde_json::from_slice(&buf).map_err(Error::ReadJson)
613 }
614 #[cfg(fuchsia_api_level_at_least = "HEAD")]
615 FormattedContent::Cbor(vmo) => {
616 let mut buf = vec![0; vmo.get_content_size().expect("Always returns Ok") as usize];
617 vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
618 Ok(ciborium::from_reader(buf.as_slice()).map_err(|err| Error::ReadCbor(err.into()))?)
619 }
620 #[cfg(fuchsia_api_level_at_least = "PLATFORM")]
621 FormattedContent::Text(_) => unreachable!("We never expect Text"),
622 #[cfg(fuchsia_api_level_at_least = "HEAD")]
623 FormattedContent::Fxt(_) => unreachable!("We never expect FXT for Inspect"),
624 FormattedContent::__SourceBreaking { unknown_ordinal: _ } => {
625 unreachable!("Received unrecognized FIDL message")
626 }
627 })
628}
629
630fn drain_batch_iterator_for_logs(
631 iterator: Arc<BatchIteratorProxy>,
632) -> impl Stream<Item = Result<LogsData, Error>> {
633 stream_batch::<LogsData>(iterator, |formatted_content| match formatted_content {
634 FormattedContent::Json(data) => {
635 let mut buf = vec![0; data.size as usize];
636 data.vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
637 serde_json::from_slice(&buf).map_err(Error::ReadJson)
638 }
639 #[cfg(fuchsia_api_level_at_least = "HEAD")]
640 FormattedContent::Fxt(vmo) => {
641 let mut buf = vec![0; vmo.get_content_size().expect("Always returns Ok") as usize];
642 vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
643 let mut current_slice = buf.as_ref();
644 let mut ret: Option<OneOrMany<LogsData>> = None;
645 loop {
646 let (data, remaining) = from_extended_record(current_slice).unwrap();
647 ret = Some(match ret.take() {
648 Some(OneOrMany::One(one)) => OneOrMany::Many(vec![one, data]),
649 Some(OneOrMany::Many(mut many)) => {
650 many.push(data);
651 OneOrMany::Many(many)
652 }
653 None => OneOrMany::One(data),
654 });
655 if remaining.is_empty() {
656 break;
657 }
658 current_slice = remaining;
659 }
660 Ok(ret.unwrap())
661 }
662 #[cfg(fuchsia_api_level_at_least = "PLATFORM")]
663 FormattedContent::Text(_) => unreachable!("We never expect Text"),
664 #[cfg(fuchsia_api_level_at_least = "HEAD")]
665 FormattedContent::Cbor(_) => unreachable!("We never expect CBOR"),
666 FormattedContent::__SourceBreaking { unknown_ordinal: _ } => {
667 unreachable!("Received unrecognized FIDL message")
668 }
669 })
670}
671
672#[pin_project]
674pub struct Subscription {
675 #[pin]
676 recv: Pin<Box<dyn FusedStream<Item = Result<LogsData, Error>> + Send>>,
677 iterator: Arc<BatchIteratorProxy>,
678}
679
680const DATA_CHANNEL_SIZE: usize = 32;
681const ERROR_CHANNEL_SIZE: usize = 2;
682
683impl Subscription {
684 pub fn new(iterator: BatchIteratorProxy) -> Self {
687 let iterator = Arc::new(iterator);
688 Subscription {
689 recv: Box::pin(drain_batch_iterator_for_logs(iterator.clone()).fuse()),
690 iterator,
691 }
692 }
693
694 pub async fn wait_for_ready(&self) {
696 self.iterator.wait_for_ready().await.expect("doesn't disconnect");
697 }
698
699 pub fn split_streams(mut self) -> (SubscriptionResultsStream<LogsData>, mpsc::Receiver<Error>) {
701 let (mut errors_sender, errors) = mpsc::channel(ERROR_CHANNEL_SIZE);
702 let (mut results_sender, recv) = mpsc::channel(DATA_CHANNEL_SIZE);
703 let _drain_task = fasync::Task::spawn(async move {
704 while let Some(result) = self.next().await {
705 match result {
706 Ok(value) => results_sender.send(value).await.ok(),
707 Err(e) => errors_sender.send(e).await.ok(),
708 };
709 }
710 });
711 (SubscriptionResultsStream { recv, _drain_task }, errors)
712 }
713}
714
715impl Stream for Subscription {
716 type Item = Result<LogsData, Error>;
717
718 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
719 let this = self.project();
720 this.recv.poll_next(cx)
721 }
722}
723
724impl FusedStream for Subscription {
725 fn is_terminated(&self) -> bool {
726 self.recv.is_terminated()
727 }
728}
729
730#[pin_project]
732pub struct SubscriptionResultsStream<T> {
733 #[pin]
734 recv: mpsc::Receiver<T>,
735 _drain_task: fasync::Task<()>,
736}
737
738impl<T> Stream for SubscriptionResultsStream<T>
739where
740 T: for<'a> Deserialize<'a>,
741{
742 type Item = T;
743
744 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
745 let this = self.project();
746 this.recv.poll_next(cx)
747 }
748}
749
750impl<T> FusedStream for SubscriptionResultsStream<T>
751where
752 T: for<'a> Deserialize<'a>,
753{
754 fn is_terminated(&self) -> bool {
755 self.recv.is_terminated()
756 }
757}
758
759#[cfg(test)]
760mod tests {
761 use super::*;
762 use assert_matches::assert_matches;
763 use diagnostics_assertions::assert_data_tree;
764 use diagnostics_log::{Publisher, PublisherOptions};
765 use fidl::endpoints::ServerEnd;
766 use fuchsia_component_test::{
767 Capability, ChildOptions, RealmBuilder, RealmInstance, Ref, Route,
768 };
769 use futures::TryStreamExt;
770 use log::{error, info};
771 use {fidl_fuchsia_diagnostics as fdiagnostics, fidl_fuchsia_logger as flogger};
772
773 const TEST_COMPONENT_URL: &str = "#meta/inspect_test_component.cm";
774
775 struct ComponentOptions {
776 publish_n_trees: u64,
777 }
778
779 async fn start_component(opts: ComponentOptions) -> Result<RealmInstance, anyhow::Error> {
780 let builder = RealmBuilder::new().await?;
781 let test_component = builder
782 .add_child("test_component", TEST_COMPONENT_URL, ChildOptions::new().eager())
783 .await?;
784 builder
785 .add_route(
786 Route::new()
787 .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
788 .from(Ref::parent())
789 .to(&test_component),
790 )
791 .await?;
792 builder.init_mutable_config_to_empty(&test_component).await.unwrap();
793 builder
794 .set_config_value(&test_component, "publish_n_trees", opts.publish_n_trees.into())
795 .await
796 .unwrap();
797 let instance = builder.build().await?;
798 Ok(instance)
799 }
800
801 #[fuchsia::test]
804 async fn inspect_data_for_component() -> Result<(), anyhow::Error> {
805 let instance = start_component(ComponentOptions { publish_n_trees: 1 }).await?;
806 let moniker = format!("realm_builder:{}/test_component", instance.root.child_name());
807 let component_selector = selectors::sanitize_moniker_for_selectors(&moniker);
808 let results = ArchiveReader::inspect()
809 .add_selector(format!("{component_selector}:[...]root"))
810 .snapshot()
811 .await?;
812 assert_eq!(results.len(), 1);
813 assert_data_tree!(results[0].payload.as_ref().unwrap(), root: {
814 "tree-0": 0u64,
815 int: 3u64,
816 "lazy-node": {
817 a: "test",
818 child: {
819 double: 3.25,
820 },
821 }
822 });
823 let lazy_property_selector = Selector {
825 component_selector: Some(fdiagnostics::ComponentSelector {
826 moniker_segments: Some(vec![
827 fdiagnostics::StringSelector::ExactMatch(format!(
828 "realm_builder:{}",
829 instance.root.child_name()
830 )),
831 fdiagnostics::StringSelector::ExactMatch("test_component".into()),
832 ]),
833 ..Default::default()
834 }),
835 tree_selector: Some(fdiagnostics::TreeSelector::PropertySelector(
836 fdiagnostics::PropertySelector {
837 node_path: vec![
838 fdiagnostics::StringSelector::ExactMatch("root".into()),
839 fdiagnostics::StringSelector::ExactMatch("lazy-node".into()),
840 ],
841 target_properties: fdiagnostics::StringSelector::ExactMatch("a".into()),
842 },
843 )),
844 tree_names: Some(fdiagnostics::TreeNames::All(fdiagnostics::All {})),
845 ..Default::default()
846 };
847 let int_property_selector = format!("{component_selector}:[...]root:int");
848 let mut reader = ArchiveReader::inspect();
849 reader.add_selector(int_property_selector).add_selector(lazy_property_selector);
850 let response = reader.snapshot().await?;
851 assert_eq!(response.len(), 1);
852 assert_eq!(response[0].moniker.to_string(), moniker);
853 assert_data_tree!(response[0].payload.as_ref().unwrap(), root: {
854 int: 3u64,
855 "lazy-node": {
856 a: "test"
857 }
858 });
859 Ok(())
860 }
861
862 #[fuchsia::test]
863 async fn select_all_for_moniker() {
864 let instance = start_component(ComponentOptions { publish_n_trees: 1 })
865 .await
866 .expect("component started");
867 let moniker = format!("realm_builder:{}/test_component", instance.root.child_name());
868 let results = ArchiveReader::inspect()
869 .select_all_for_component(moniker)
870 .snapshot()
871 .await
872 .expect("snapshotted");
873 assert_eq!(results.len(), 1);
874 assert_data_tree!(results[0].payload.as_ref().unwrap(), root: {
875 "tree-0": 0u64,
876 int: 3u64,
877 "lazy-node": {
878 a: "test",
879 child: {
880 double: 3.25,
881 },
882 }
883 });
884 }
885
886 #[fuchsia::test]
887 async fn timeout() -> Result<(), anyhow::Error> {
888 let instance = start_component(ComponentOptions { publish_n_trees: 1 }).await?;
889
890 let mut reader = ArchiveReader::inspect();
891 reader
892 .add_selector(format!(
893 "realm_builder\\:{}/test_component:root",
894 instance.root.child_name()
895 ))
896 .with_timeout(zx::MonotonicDuration::from_nanos(0));
897 let result = reader.snapshot().await;
898 assert!(result.unwrap().is_empty());
899 Ok(())
900 }
901
902 #[fuchsia::test]
903 async fn component_selector() {
904 let selector = ComponentSelector::new(vec!["a".to_string()]);
905 assert_eq!(selector.moniker_str(), "a");
906 let arguments: Vec<_> = selector.to_selector_arguments().collect();
907 assert_eq!(arguments, vec![SelectorArgument::RawSelector("a:root".to_string())]);
908
909 let selector =
910 ComponentSelector::new(vec!["b".to_string(), "c".to_string(), "a".to_string()]);
911 assert_eq!(selector.moniker_str(), "b/c/a");
912
913 let selector = selector.with_tree_selector("root/b/c:d").with_tree_selector("root/e:f");
914 let arguments: Vec<_> = selector.to_selector_arguments().collect();
915 assert_eq!(
916 arguments,
917 vec![
918 SelectorArgument::RawSelector("b/c/a:root/b/c:d".into()),
919 SelectorArgument::RawSelector("b/c/a:root/e:f".into()),
920 ]
921 );
922 }
923
924 #[fuchsia::test]
925 async fn custom_archive() {
926 let proxy = spawn_fake_archive(serde_json::json!({
927 "moniker": "moniker",
928 "version": 1,
929 "data_source": "Inspect",
930 "metadata": {
931 "component_url": "component-url",
932 "timestamp": 0,
933 "filename": "filename",
934 },
935 "payload": {
936 "root": {
937 "x": 1,
938 }
939 }
940 }));
941 let result =
942 ArchiveReader::inspect().with_archive(proxy).snapshot().await.expect("got result");
943 assert_eq!(result.len(), 1);
944 assert_data_tree!(result[0].payload.as_ref().unwrap(), root: { x: 1u64 });
945 }
946
947 #[fuchsia::test]
948 async fn handles_lists_correctly_on_snapshot_raw() {
949 let value = serde_json::json!({
950 "moniker": "moniker",
951 "version": 1,
952 "data_source": "Inspect",
953 "metadata": {
954 "component_url": "component-url",
955 "timestamp": 0,
956 "filename": "filename",
957 },
958 "payload": {
959 "root": {
960 "x": 1,
961 }
962 }
963 });
964 let proxy = spawn_fake_archive(serde_json::json!([value.clone()]));
965 let mut reader = ArchiveReader::inspect();
966 reader.with_archive(proxy);
967 let json_result = reader.snapshot_raw::<serde_json::Value>().await.expect("got result");
968 match json_result {
969 serde_json::Value::Array(values) => {
970 assert_eq!(values.len(), 1);
971 assert_eq!(values[0], value);
972 }
973 result => panic!("unexpected result: {:?}", result),
974 }
975 let cbor_result = reader.snapshot_raw::<ciborium::Value>().await.expect("got result");
976 match cbor_result {
977 ciborium::Value::Array(values) => {
978 assert_eq!(values.len(), 1);
979 let json_result =
980 values[0].deserialized::<serde_json::Value>().expect("convert to json");
981 assert_eq!(json_result, value);
982 }
983 result => panic!("unexpected result: {:?}", result),
984 }
985 }
986
987 #[fuchsia::test(logging = false)]
988 async fn snapshot_then_subscribe() {
989 let (_instance, publisher, reader) = init_isolated_logging().await;
990 let (mut stream, _errors) =
991 reader.snapshot_then_subscribe().expect("subscribed to logs").split_streams();
992 log::set_boxed_logger(Box::new(publisher)).unwrap();
993 info!("hello from test");
994 error!("error from test");
995 let log = stream.next().await.unwrap();
996 assert_eq!(log.msg().unwrap(), "hello from test");
997 let log = stream.next().await.unwrap();
998 assert_eq!(log.msg().unwrap(), "error from test");
999 }
1000
1001 #[fuchsia::test]
1002 async fn read_many_trees_with_filtering() {
1003 let instance = start_component(ComponentOptions { publish_n_trees: 2 })
1004 .await
1005 .expect("component started");
1006 let selector = format!(
1007 "realm_builder\\:{}/test_component:[name=tree-0]root",
1008 instance.root.child_name()
1009 );
1010 let results = ArchiveReader::inspect()
1011 .add_selector(selector)
1012 .with_minimum_schema_count(1)
1014 .snapshot()
1015 .await
1016 .expect("snapshotted");
1017 assert_matches!(results.iter().find(|v| v.metadata.name.as_ref() == "tree-1"), None);
1018 let should_have_data =
1019 results.into_iter().find(|v| v.metadata.name.as_ref() == "tree-0").unwrap();
1020 assert_data_tree!(should_have_data.payload.unwrap(), root: contains {
1021 "tree-0": 0u64,
1022 });
1023 }
1024
1025 fn spawn_fake_archive(data_to_send: serde_json::Value) -> fdiagnostics::ArchiveAccessorProxy {
1026 let (proxy, mut stream) =
1027 fidl::endpoints::create_proxy_and_stream::<fdiagnostics::ArchiveAccessorMarker>();
1028 fasync::Task::spawn(async move {
1029 while let Some(request) = stream.try_next().await.expect("stream request") {
1030 match request {
1031 fdiagnostics::ArchiveAccessorRequest::StreamDiagnostics {
1032 result_stream,
1033 ..
1034 } => {
1035 let data = data_to_send.clone();
1036 fasync::Task::spawn(handle_batch_iterator(data, result_stream)).detach();
1037 }
1038 fdiagnostics::ArchiveAccessorRequest::WaitForReady { responder, .. } => {
1039 let _ = responder.send();
1040 }
1041 fdiagnostics::ArchiveAccessorRequest::_UnknownMethod { .. } => {
1042 unreachable!("Unexpected method call");
1043 }
1044 }
1045 }
1046 })
1047 .detach();
1048 proxy
1049 }
1050
1051 async fn handle_batch_iterator(
1052 data: serde_json::Value,
1053 result_stream: ServerEnd<fdiagnostics::BatchIteratorMarker>,
1054 ) {
1055 let mut called = false;
1056 let mut stream = result_stream.into_stream();
1057 while let Some(req) = stream.try_next().await.expect("stream request") {
1058 match req {
1059 fdiagnostics::BatchIteratorRequest::WaitForReady { responder } => {
1060 let _ = responder.send();
1061 }
1062 fdiagnostics::BatchIteratorRequest::GetNext { responder } => {
1063 if called {
1064 responder.send(Ok(Vec::new())).expect("send response");
1065 continue;
1066 }
1067 called = true;
1068 let content = serde_json::to_string_pretty(&data).expect("json pretty");
1069 let vmo_size = content.len() as u64;
1070 let vmo = zx::Vmo::create(vmo_size).expect("create vmo");
1071 vmo.write(content.as_bytes(), 0).expect("write vmo");
1072 let buffer = fidl_fuchsia_mem::Buffer { vmo, size: vmo_size };
1073 responder
1074 .send(Ok(vec![fdiagnostics::FormattedContent::Json(buffer)]))
1075 .expect("send response");
1076 }
1077 fdiagnostics::BatchIteratorRequest::_UnknownMethod { .. } => {
1078 unreachable!("Unexpected method call");
1079 }
1080 }
1081 }
1082 }
1083
1084 async fn create_realm() -> RealmBuilder {
1085 let builder = RealmBuilder::new().await.expect("create realm builder");
1086 let archivist = builder
1087 .add_child("archivist", "#meta/archivist-for-embedding.cm", ChildOptions::new().eager())
1088 .await
1089 .expect("add child archivist");
1090 builder
1091 .add_route(
1092 Route::new()
1093 .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
1094 .capability(
1095 Capability::protocol_by_name("fuchsia.tracing.provider.Registry")
1096 .optional(),
1097 )
1098 .capability(Capability::event_stream("stopped"))
1099 .capability(Capability::event_stream("capability_requested"))
1100 .from(Ref::parent())
1101 .to(&archivist),
1102 )
1103 .await
1104 .expect("added routes from parent to archivist");
1105 builder
1106 .add_route(
1107 Route::new()
1108 .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
1109 .from(&archivist)
1110 .to(Ref::parent()),
1111 )
1112 .await
1113 .expect("routed LogSink from archivist to parent");
1114 builder
1115 .add_route(
1116 Route::new()
1117 .capability(Capability::protocol_by_name("fuchsia.diagnostics.ArchiveAccessor"))
1118 .from_dictionary("diagnostics-accessors")
1119 .from(&archivist)
1120 .to(Ref::parent()),
1121 )
1122 .await
1123 .expect("routed ArchiveAccessor from archivist to parent");
1124 builder
1125 }
1126
1127 async fn init_isolated_logging() -> (RealmInstance, Publisher, ArchiveReader<Logs>) {
1128 let instance = create_realm().await.build().await.unwrap();
1129 let log_sink_proxy =
1130 instance.root.connect_to_protocol_at_exposed_dir::<flogger::LogSinkMarker>().unwrap();
1131 let accessor_proxy = instance
1132 .root
1133 .connect_to_protocol_at_exposed_dir::<fdiagnostics::ArchiveAccessorMarker>()
1134 .unwrap();
1135 let mut reader = ArchiveReader::logs();
1136 reader.with_archive(accessor_proxy);
1137 let options = PublisherOptions::default()
1138 .wait_for_initial_interest(false)
1139 .use_log_sink(log_sink_proxy);
1140 let publisher = Publisher::new(options).unwrap();
1141 (instance, publisher, reader)
1142 }
1143
1144 #[fuchsia::test]
1145 fn retry_config_behavior() {
1146 let config = RetryConfig::MinSchemaCount(1);
1147 let got = 0;
1148
1149 assert!(config.should_retry(got));
1150
1151 let config = RetryConfig::MinSchemaCount(1);
1152 let got = 1;
1153
1154 assert!(!config.should_retry(got));
1155
1156 let config = RetryConfig::MinSchemaCount(1);
1157 let got = 2;
1158
1159 assert!(!config.should_retry(got));
1160
1161 let config = RetryConfig::MinSchemaCount(0);
1162 let got = 1;
1163
1164 assert!(!config.should_retry(got));
1165
1166 let config = RetryConfig::always();
1167 let got = 0;
1168
1169 assert!(config.should_retry(got));
1170
1171 let config = RetryConfig::never();
1172 let got = 0;
1173
1174 assert!(!config.should_retry(got));
1175 }
1176}