diagnostics_reader/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![deny(missing_docs)]
6
7//! A library for reading Inspect and Log data from
8//! the ArchiveAccessor FIDL protocol.
9
10use async_stream::stream;
11use diagnostics_data::{DiagnosticsData, LogsData};
12#[cfg(fuchsia_api_level_less_than = "HEAD")]
13use diagnostics_message as _;
14#[cfg(fuchsia_api_level_at_least = "HEAD")]
15use diagnostics_message::from_extended_record;
16use fidl_fuchsia_diagnostics::{
17    ArchiveAccessorMarker, ArchiveAccessorProxy, BatchIteratorMarker, BatchIteratorProxy,
18    ClientSelectorConfiguration, Format, FormattedContent, PerformanceConfiguration, ReaderError,
19    Selector, SelectorArgument, StreamMode, StreamParameters,
20};
21use fuchsia_async::{self as fasync, DurationExt, TimeoutExt};
22use fuchsia_component::client;
23use futures::channel::mpsc;
24use futures::prelude::*;
25use futures::sink::SinkExt;
26use futures::stream::FusedStream;
27use pin_project::pin_project;
28use serde::Deserialize;
29use std::future::ready;
30use std::marker::PhantomData;
31use std::pin::Pin;
32use std::sync::Arc;
33use std::task::{Context, Poll};
34use thiserror::Error;
35use zx::{self as zx, MonotonicDuration};
36
37/// Alias for ArchiveReader<Logs>. Used for reading logs.
38pub type LogsArchiveReader = ArchiveReader<Logs>;
39
40/// Alias for ArchiveReader<Inspect>. Used for reading inspect.
41pub type InspectArchiveReader = ArchiveReader<Inspect>;
42
43pub use diagnostics_data::{Data, Inspect, Logs, Severity};
44pub use diagnostics_hierarchy::{hierarchy, DiagnosticsHierarchy, Property};
45
46const RETRY_DELAY_MS: i64 = 300;
47
48#[cfg(fuchsia_api_level_at_least = "HEAD")]
49const FORMAT: Format = Format::Cbor;
50#[cfg(fuchsia_api_level_less_than = "HEAD")]
51const FORMAT: Format = Format::Json;
52
53/// Errors that this library can return
54#[derive(Debug, Error)]
55pub enum Error {
56    /// Failed to connect to the archive accessor
57    #[error("Failed to connect to the archive accessor")]
58    ConnectToArchive(#[source] anyhow::Error),
59
60    /// Failed to create the BatchIterator channel ends
61    #[error("Failed to create the BatchIterator channel ends")]
62    CreateIteratorProxy(#[source] fidl::Error),
63
64    /// Failed to stream diagnostics from the accessor
65    #[error("Failed to stream diagnostics from the accessor")]
66    StreamDiagnostics(#[source] fidl::Error),
67
68    /// Failed to call iterator server
69    #[error("Failed to call iterator server")]
70    GetNextCall(#[source] fidl::Error),
71
72    /// Received error from the GetNext response
73    #[error("Received error from the GetNext response: {0:?}")]
74    GetNextReaderError(ReaderError),
75
76    /// Failed to read json received
77    #[error("Failed to read json received")]
78    ReadJson(#[source] serde_json::Error),
79
80    /// Failed to read cbor received
81    #[cfg(fuchsia_api_level_at_least = "HEAD")]
82    #[error("Failed to read cbor received")]
83    ReadCbor(#[source] anyhow::Error),
84
85    /// Failed to parse the diagnostics data from the json received
86    #[error("Failed to parse the diagnostics data from the json received")]
87    ParseDiagnosticsData(#[source] serde_json::Error),
88
89    /// Failed to read vmo from the response
90    #[error("Failed to read vmo from the response")]
91    ReadVmo(#[source] zx::Status),
92}
93
94/// An inspect tree selector for a component.
95pub struct ComponentSelector {
96    moniker: Vec<String>,
97    tree_selectors: Vec<String>,
98}
99
100impl ComponentSelector {
101    /// Create a new component event selector.
102    /// By default it will select the whole tree unless tree selectors are provided.
103    /// `moniker` is the realm path relative to the realm of the running component plus the
104    /// component name. For example: [a, b, component].
105    pub fn new(moniker: Vec<String>) -> Self {
106        Self { moniker, tree_selectors: Vec::new() }
107    }
108
109    /// Select a section of the inspect tree.
110    pub fn with_tree_selector(mut self, tree_selector: impl Into<String>) -> Self {
111        self.tree_selectors.push(tree_selector.into());
112        self
113    }
114
115    fn moniker_str(&self) -> String {
116        self.moniker.join("/")
117    }
118}
119
120/// Trait used for things that can be converted to selector arguments.
121pub trait ToSelectorArguments {
122    /// Converts this to selector arguments.
123    fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>>;
124}
125
126/// Trait used for things that can be converted to component selector arguments.
127pub trait ToComponentSelectorArguments {
128    /// Converts this to selector arguments.
129    fn to_component_selector_arguments(self) -> ComponentSelector;
130}
131
132impl ToComponentSelectorArguments for &str {
133    fn to_component_selector_arguments(self) -> ComponentSelector {
134        if self.contains("\\:") {
135            // String is already escaped, don't escape it.
136            ComponentSelector::new(self.split("/").map(|value| value.to_string()).collect())
137        } else {
138            // String isn't escaped, escape it
139            ComponentSelector::new(
140                selectors::sanitize_moniker_for_selectors(self)
141                    .split("/")
142                    .map(|value| value.to_string())
143                    .collect(),
144            )
145            .with_tree_selector("[...]root")
146        }
147    }
148}
149
150impl ToComponentSelectorArguments for String {
151    fn to_component_selector_arguments(self) -> ComponentSelector {
152        self.as_str().to_component_selector_arguments()
153    }
154}
155
156impl ToComponentSelectorArguments for ComponentSelector {
157    fn to_component_selector_arguments(self) -> ComponentSelector {
158        self
159    }
160}
161
162impl ToSelectorArguments for String {
163    fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
164        Box::new([SelectorArgument::RawSelector(self)].into_iter())
165    }
166}
167
168impl ToSelectorArguments for &str {
169    fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
170        Box::new([SelectorArgument::RawSelector(self.to_string())].into_iter())
171    }
172}
173
174impl ToSelectorArguments for ComponentSelector {
175    fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
176        let moniker = self.moniker_str();
177        // If not tree selectors were provided, select the full tree.
178        if self.tree_selectors.is_empty() {
179            Box::new([SelectorArgument::RawSelector(format!("{}:root", moniker))].into_iter())
180        } else {
181            Box::new(
182                self.tree_selectors
183                    .into_iter()
184                    .map(move |s| SelectorArgument::RawSelector(format!("{moniker}:{s}"))),
185            )
186        }
187    }
188}
189
190impl ToSelectorArguments for Selector {
191    fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
192        Box::new([SelectorArgument::StructuredSelector(self)].into_iter())
193    }
194}
195
196/// Before unsealing this, consider whether your code belongs in this file.
197pub trait SerializableValue: private::Sealed {
198    /// The Format of this SerializableValue. Either Logs or Inspect.
199    const FORMAT_OF_VALUE: Format;
200}
201
202/// Trait used to verify that a JSON payload has a valid diagnostics payload.
203pub trait CheckResponse: private::Sealed {
204    /// Returns true if the response has a valid payload.
205    fn has_payload(&self) -> bool;
206}
207
208// The "sealed trait" pattern.
209//
210// https://rust-lang.github.io/api-guidelines/future-proofing.html
211mod private {
212    pub trait Sealed {}
213}
214impl private::Sealed for serde_json::Value {}
215impl private::Sealed for ciborium::Value {}
216impl<D: DiagnosticsData> private::Sealed for Data<D> {}
217
218impl<D: DiagnosticsData> CheckResponse for Data<D> {
219    fn has_payload(&self) -> bool {
220        self.payload.is_some()
221    }
222}
223
224impl SerializableValue for serde_json::Value {
225    const FORMAT_OF_VALUE: Format = Format::Json;
226}
227
228impl CheckResponse for serde_json::Value {
229    fn has_payload(&self) -> bool {
230        match self {
231            serde_json::Value::Object(obj) => {
232                obj.get("payload").map(|p| !matches!(p, serde_json::Value::Null)).is_some()
233            }
234            _ => false,
235        }
236    }
237}
238
239#[cfg(fuchsia_api_level_at_least = "HEAD")]
240impl SerializableValue for ciborium::Value {
241    const FORMAT_OF_VALUE: Format = Format::Cbor;
242}
243
244impl CheckResponse for ciborium::Value {
245    fn has_payload(&self) -> bool {
246        match self {
247            ciborium::Value::Map(m) => {
248                let payload_key = ciborium::Value::Text("payload".into());
249                m.iter().any(|(key, _)| *key == payload_key)
250            }
251            _ => false,
252        }
253    }
254}
255
256/// Retry configuration for ArchiveReader
257#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
258pub enum RetryConfig {
259    /// The minimum schema count required for a successful read.
260    /// This guarantees that a read will contain at least MinSchemaCount
261    /// results.
262    MinSchemaCount(usize),
263}
264
265impl RetryConfig {
266    /// Always retry
267    pub fn always() -> Self {
268        Self::MinSchemaCount(1)
269    }
270
271    /// Never retry
272    pub fn never() -> Self {
273        Self::MinSchemaCount(0)
274    }
275
276    /// Retry result_count times
277    fn should_retry(&self, result_count: usize) -> bool {
278        match self {
279            Self::MinSchemaCount(min) => *min > result_count,
280        }
281    }
282}
283
284/// A trait representing a type of diagnostics data.
285pub trait DiagnosticsDataType: private::Sealed {}
286
287impl private::Sealed for Logs {}
288
289impl private::Sealed for Inspect {}
290
291impl DiagnosticsDataType for Logs {}
292
293impl DiagnosticsDataType for Inspect {}
294
295/// Utility for reading inspect data of a running component using the injected Archive
296/// Reader service.
297pub struct ArchiveReader<T> {
298    archive: Option<ArchiveAccessorProxy>,
299    selectors: Vec<SelectorArgument>,
300    retry_config: RetryConfig,
301    timeout: Option<MonotonicDuration>,
302    batch_retrieval_timeout_seconds: Option<i64>,
303    max_aggregated_content_size_bytes: Option<u64>,
304    format: Option<Format>,
305    _phantom: PhantomData<T>,
306}
307
308impl<T: DiagnosticsDataType> ArchiveReader<T> {
309    /// Initializes the ArchiveReader with a custom connection to an ArchiveAccessor.
310    /// By default, the connection will be initialized by connecting to
311    /// fuchsia.diagnostics.ArchiveAccessor
312    pub fn with_archive(&mut self, archive: ArchiveAccessorProxy) -> &mut Self {
313        self.archive = Some(archive);
314        self
315    }
316
317    /// Sets the minimum number of schemas expected in a result in order for the
318    /// result to be considered a success.
319    pub fn with_minimum_schema_count(&mut self, minimum_schema_count: usize) -> &mut Self {
320        self.retry_config = RetryConfig::MinSchemaCount(minimum_schema_count);
321        self
322    }
323
324    /// Sets a custom retry configuration. By default we always retry.
325    pub fn retry(&mut self, config: RetryConfig) -> &mut Self {
326        self.retry_config = config;
327        self
328    }
329
330    /// Sets the maximum time to wait for a response from the Archive.
331    /// Do not use in tests unless timeout is the expected behavior.
332    pub fn with_timeout(&mut self, duration: MonotonicDuration) -> &mut Self {
333        self.timeout = Some(duration);
334        self
335    }
336
337    /// Filters logs for a specific component or component selector.
338    /// If string input, the string may be either a component selector string
339    /// or a moniker, or a ComponentSelector may be passed directly.
340    pub fn select_all_for_component(
341        &mut self,
342        component: impl ToComponentSelectorArguments,
343    ) -> &mut Self {
344        self.selectors.extend(component.to_component_selector_arguments().to_selector_arguments());
345        self
346    }
347
348    /// Connects to the ArchiveAccessor and returns data matching provided selectors.
349    async fn snapshot_shared<D>(&self) -> Result<Vec<Data<D>>, Error>
350    where
351        D: DiagnosticsData + 'static,
352    {
353        let data_future = self.snapshot_inner::<D, Data<D>>(FORMAT);
354        let data = match self.timeout {
355            Some(timeout) => data_future.on_timeout(timeout.after_now(), || Ok(Vec::new())).await?,
356            None => data_future.await?,
357        };
358        Ok(data)
359    }
360
361    async fn snapshot_inner<D, Y>(&self, format: Format) -> Result<Vec<Y>, Error>
362    where
363        D: DiagnosticsData,
364        Y: for<'a> Deserialize<'a> + CheckResponse + Send + 'static,
365    {
366        loop {
367            let iterator = self.batch_iterator::<D>(StreamMode::Snapshot, format)?;
368            let result = drain_batch_iterator::<Y>(Arc::new(iterator))
369                .filter_map(|value| ready(value.ok()))
370                .collect::<Vec<_>>()
371                .await;
372
373            if self.retry_config.should_retry(result.len()) {
374                fasync::Timer::new(fasync::MonotonicInstant::after(
375                    zx::MonotonicDuration::from_millis(RETRY_DELAY_MS),
376                ))
377                .await;
378            } else {
379                return Ok(result);
380            }
381        }
382    }
383
384    fn batch_iterator<D>(
385        &self,
386        mode: StreamMode,
387        format: Format,
388    ) -> Result<BatchIteratorProxy, Error>
389    where
390        D: DiagnosticsData,
391    {
392        let archive = match &self.archive {
393            Some(archive) => archive.clone(),
394            None => client::connect_to_protocol::<ArchiveAccessorMarker>()
395                .map_err(Error::ConnectToArchive)?,
396        };
397
398        let (iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
399
400        let stream_parameters = StreamParameters {
401            stream_mode: Some(mode),
402            data_type: Some(D::DATA_TYPE),
403            format: Some(format),
404            client_selector_configuration: if self.selectors.is_empty() {
405                Some(ClientSelectorConfiguration::SelectAll(true))
406            } else {
407                Some(ClientSelectorConfiguration::Selectors(self.selectors.to_vec()))
408            },
409            performance_configuration: Some(PerformanceConfiguration {
410                max_aggregate_content_size_bytes: self.max_aggregated_content_size_bytes,
411                batch_retrieval_timeout_seconds: self.batch_retrieval_timeout_seconds,
412                ..Default::default()
413            }),
414            ..Default::default()
415        };
416
417        archive
418            .stream_diagnostics(&stream_parameters, server_end)
419            .map_err(Error::StreamDiagnostics)?;
420        Ok(iterator)
421    }
422}
423
424impl ArchiveReader<Logs> {
425    /// Creates an ArchiveReader for reading logs
426    pub fn logs() -> Self {
427        ArchiveReader::<Logs> {
428            timeout: None,
429            format: None,
430            selectors: vec![],
431            retry_config: RetryConfig::always(),
432            archive: None,
433            batch_retrieval_timeout_seconds: None,
434            max_aggregated_content_size_bytes: None,
435            _phantom: PhantomData,
436        }
437    }
438
439    #[doc(hidden)]
440    pub fn with_format(&mut self, format: Format) -> &mut Self {
441        self.format = Some(format);
442        self
443    }
444
445    #[inline]
446    fn format(&self) -> Format {
447        match self.format {
448            Some(f) => f,
449            None => {
450                #[cfg(fuchsia_api_level_at_least = "HEAD")]
451                let ret = Format::Fxt;
452                #[cfg(fuchsia_api_level_less_than = "HEAD")]
453                let ret = Format::Json;
454                ret
455            }
456        }
457    }
458
459    /// Connects to the ArchiveAccessor and returns data matching provided selectors.
460    pub async fn snapshot(&self) -> Result<Vec<Data<Logs>>, Error> {
461        loop {
462            let iterator = self.batch_iterator::<Logs>(StreamMode::Snapshot, self.format())?;
463            let result = drain_batch_iterator_for_logs(Arc::new(iterator))
464                .filter_map(|value| ready(value.ok()))
465                .collect::<Vec<_>>()
466                .await;
467
468            if self.retry_config.should_retry(result.len()) {
469                fasync::Timer::new(fasync::MonotonicInstant::after(
470                    zx::MonotonicDuration::from_millis(RETRY_DELAY_MS),
471                ))
472                .await;
473            } else {
474                return Ok(result);
475            }
476        }
477    }
478
479    /// Connects to the ArchiveAccessor and returns a stream of data containing a snapshot of the
480    /// current buffer in the Archivist as well as new data that arrives.
481    pub fn snapshot_then_subscribe(&self) -> Result<Subscription, Error> {
482        let iterator =
483            self.batch_iterator::<Logs>(StreamMode::SnapshotThenSubscribe, self.format())?;
484        Ok(Subscription::new(iterator))
485    }
486}
487
488impl ArchiveReader<Inspect> {
489    /// Creates an ArchiveReader for reading Inspect data.
490    pub fn inspect() -> Self {
491        ArchiveReader::<Inspect> {
492            timeout: None,
493            format: None,
494            selectors: vec![],
495            retry_config: RetryConfig::always(),
496            archive: None,
497            batch_retrieval_timeout_seconds: None,
498            max_aggregated_content_size_bytes: None,
499            _phantom: PhantomData,
500        }
501    }
502
503    /// Set the maximum time to wait for a wait for a single component
504    /// to have its diagnostics data "pumped".
505    pub fn with_batch_retrieval_timeout_seconds(&mut self, timeout: i64) -> &mut Self {
506        self.batch_retrieval_timeout_seconds = Some(timeout);
507        self
508    }
509
510    /// Sets the total number of bytes allowed in a single VMO read.
511    pub fn with_aggregated_result_bytes_limit(&mut self, limit_bytes: u64) -> &mut Self {
512        self.max_aggregated_content_size_bytes = Some(limit_bytes);
513        self
514    }
515
516    /// Connects to the ArchiveAccessor and returns inspect data matching provided selectors.
517    /// Returns the raw json for each hierarchy fetched. This is used for CTF compatibility
518    /// tests (which test various implementation details of the JSON format),
519    /// and use beyond such tests is discouraged.
520    pub async fn snapshot_raw<T>(&self) -> Result<T, Error>
521    where
522        T: for<'a> Deserialize<'a>
523            + SerializableValue
524            + From<Vec<T>>
525            + CheckResponse
526            + 'static
527            + Send,
528    {
529        let data_future = self.snapshot_inner::<Inspect, T>(T::FORMAT_OF_VALUE);
530        let data = match self.timeout {
531            Some(timeout) => data_future.on_timeout(timeout.after_now(), || Ok(Vec::new())).await?,
532            None => data_future.await?,
533        };
534        Ok(T::from(data))
535    }
536
537    /// Adds selectors used for performing filtering inspect hierarchies.
538    /// This may be called multiple times to add additional selectors.
539    pub fn add_selectors<T, S>(&mut self, selectors: T) -> &mut Self
540    where
541        T: Iterator<Item = S>,
542        S: ToSelectorArguments,
543    {
544        for selector in selectors {
545            self.add_selector(selector);
546        }
547        self
548    }
549
550    /// Requests a single component tree (or sub-tree).
551    pub fn add_selector(&mut self, selector: impl ToSelectorArguments) -> &mut Self {
552        self.selectors.extend(selector.to_selector_arguments());
553        self
554    }
555
556    /// Connects to the ArchiveAccessor and returns data matching provided selectors.
557    pub async fn snapshot(&self) -> Result<Vec<Data<Inspect>>, Error> {
558        self.snapshot_shared::<Inspect>().await
559    }
560}
561
562#[derive(Debug, Deserialize)]
563#[serde(untagged)]
564enum OneOrMany<T> {
565    Many(Vec<T>),
566    One(T),
567}
568
569fn stream_batch<T>(
570    iterator: Arc<BatchIteratorProxy>,
571    process_content: impl Fn(FormattedContent) -> Result<OneOrMany<T>, Error>,
572) -> impl Stream<Item = Result<T, Error>>
573where
574    T: for<'a> Deserialize<'a> + Send + 'static,
575{
576    stream! {
577        loop {
578            let next_batch = iterator
579                .get_next()
580                .await
581                .map_err(Error::GetNextCall)?
582                .map_err(Error::GetNextReaderError)?;
583            if next_batch.is_empty() {
584                // End of stream
585                return;
586            }
587            for formatted_content in next_batch {
588                let output = process_content(formatted_content)?;
589                match output {
590                    OneOrMany::One(data) => yield Ok(data),
591                    OneOrMany::Many(datas) => {
592                        for data in datas {
593                            yield Ok(data);
594                        }
595                    }
596                }
597            }
598        }
599    }
600}
601
602fn drain_batch_iterator<T>(
603    iterator: Arc<BatchIteratorProxy>,
604) -> impl Stream<Item = Result<T, Error>>
605where
606    T: for<'a> Deserialize<'a> + Send + 'static,
607{
608    stream_batch(iterator, |formatted_content| match formatted_content {
609        FormattedContent::Json(data) => {
610            let mut buf = vec![0; data.size as usize];
611            data.vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
612            serde_json::from_slice(&buf).map_err(Error::ReadJson)
613        }
614        #[cfg(fuchsia_api_level_at_least = "HEAD")]
615        FormattedContent::Cbor(vmo) => {
616            let mut buf = vec![0; vmo.get_content_size().expect("Always returns Ok") as usize];
617            vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
618            Ok(ciborium::from_reader(buf.as_slice()).map_err(|err| Error::ReadCbor(err.into()))?)
619        }
620        #[cfg(fuchsia_api_level_at_least = "PLATFORM")]
621        FormattedContent::Text(_) => unreachable!("We never expect Text"),
622        #[cfg(fuchsia_api_level_at_least = "HEAD")]
623        FormattedContent::Fxt(_) => unreachable!("We never expect FXT for Inspect"),
624        FormattedContent::__SourceBreaking { unknown_ordinal: _ } => {
625            unreachable!("Received unrecognized FIDL message")
626        }
627    })
628}
629
630fn drain_batch_iterator_for_logs(
631    iterator: Arc<BatchIteratorProxy>,
632) -> impl Stream<Item = Result<LogsData, Error>> {
633    stream_batch::<LogsData>(iterator, |formatted_content| match formatted_content {
634        FormattedContent::Json(data) => {
635            let mut buf = vec![0; data.size as usize];
636            data.vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
637            serde_json::from_slice(&buf).map_err(Error::ReadJson)
638        }
639        #[cfg(fuchsia_api_level_at_least = "HEAD")]
640        FormattedContent::Fxt(vmo) => {
641            let mut buf = vec![0; vmo.get_content_size().expect("Always returns Ok") as usize];
642            vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
643            let mut current_slice = buf.as_ref();
644            let mut ret: Option<OneOrMany<LogsData>> = None;
645            loop {
646                let (data, remaining) = from_extended_record(current_slice).unwrap();
647                ret = Some(match ret.take() {
648                    Some(OneOrMany::One(one)) => OneOrMany::Many(vec![one, data]),
649                    Some(OneOrMany::Many(mut many)) => {
650                        many.push(data);
651                        OneOrMany::Many(many)
652                    }
653                    None => OneOrMany::One(data),
654                });
655                if remaining.is_empty() {
656                    break;
657                }
658                current_slice = remaining;
659            }
660            Ok(ret.unwrap())
661        }
662        #[cfg(fuchsia_api_level_at_least = "PLATFORM")]
663        FormattedContent::Text(_) => unreachable!("We never expect Text"),
664        #[cfg(fuchsia_api_level_at_least = "HEAD")]
665        FormattedContent::Cbor(_) => unreachable!("We never expect CBOR"),
666        FormattedContent::__SourceBreaking { unknown_ordinal: _ } => {
667            unreachable!("Received unrecognized FIDL message")
668        }
669    })
670}
671
672/// A subscription used for reading logs.
673#[pin_project]
674pub struct Subscription {
675    #[pin]
676    recv: Pin<Box<dyn FusedStream<Item = Result<LogsData, Error>> + Send>>,
677    iterator: Arc<BatchIteratorProxy>,
678}
679
680const DATA_CHANNEL_SIZE: usize = 32;
681const ERROR_CHANNEL_SIZE: usize = 2;
682
683impl Subscription {
684    /// Creates a new subscription stream to a batch iterator.
685    /// The stream will return diagnostics data structures.
686    pub fn new(iterator: BatchIteratorProxy) -> Self {
687        let iterator = Arc::new(iterator);
688        Subscription {
689            recv: Box::pin(drain_batch_iterator_for_logs(iterator.clone()).fuse()),
690            iterator,
691        }
692    }
693
694    /// Wait for the connection with the server to be established.
695    pub async fn wait_for_ready(&self) {
696        self.iterator.wait_for_ready().await.expect("doesn't disconnect");
697    }
698
699    /// Splits the subscription into two separate streams: results and errors.
700    pub fn split_streams(mut self) -> (SubscriptionResultsStream<LogsData>, mpsc::Receiver<Error>) {
701        let (mut errors_sender, errors) = mpsc::channel(ERROR_CHANNEL_SIZE);
702        let (mut results_sender, recv) = mpsc::channel(DATA_CHANNEL_SIZE);
703        let _drain_task = fasync::Task::spawn(async move {
704            while let Some(result) = self.next().await {
705                match result {
706                    Ok(value) => results_sender.send(value).await.ok(),
707                    Err(e) => errors_sender.send(e).await.ok(),
708                };
709            }
710        });
711        (SubscriptionResultsStream { recv, _drain_task }, errors)
712    }
713}
714
715impl Stream for Subscription {
716    type Item = Result<LogsData, Error>;
717
718    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
719        let this = self.project();
720        this.recv.poll_next(cx)
721    }
722}
723
724impl FusedStream for Subscription {
725    fn is_terminated(&self) -> bool {
726        self.recv.is_terminated()
727    }
728}
729
730/// A stream for reading diagnostics data
731#[pin_project]
732pub struct SubscriptionResultsStream<T> {
733    #[pin]
734    recv: mpsc::Receiver<T>,
735    _drain_task: fasync::Task<()>,
736}
737
738impl<T> Stream for SubscriptionResultsStream<T>
739where
740    T: for<'a> Deserialize<'a>,
741{
742    type Item = T;
743
744    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
745        let this = self.project();
746        this.recv.poll_next(cx)
747    }
748}
749
750impl<T> FusedStream for SubscriptionResultsStream<T>
751where
752    T: for<'a> Deserialize<'a>,
753{
754    fn is_terminated(&self) -> bool {
755        self.recv.is_terminated()
756    }
757}
758
759#[cfg(test)]
760mod tests {
761    use super::*;
762    use assert_matches::assert_matches;
763    use diagnostics_assertions::assert_data_tree;
764    use diagnostics_log::{Publisher, PublisherOptions};
765    use fidl::endpoints::ServerEnd;
766    use fuchsia_component_test::{
767        Capability, ChildOptions, RealmBuilder, RealmInstance, Ref, Route,
768    };
769    use futures::TryStreamExt;
770    use log::{error, info};
771    use {fidl_fuchsia_diagnostics as fdiagnostics, fidl_fuchsia_logger as flogger};
772
773    const TEST_COMPONENT_URL: &str = "#meta/inspect_test_component.cm";
774
775    struct ComponentOptions {
776        publish_n_trees: u64,
777    }
778
779    async fn start_component(opts: ComponentOptions) -> Result<RealmInstance, anyhow::Error> {
780        let builder = RealmBuilder::new().await?;
781        let test_component = builder
782            .add_child("test_component", TEST_COMPONENT_URL, ChildOptions::new().eager())
783            .await?;
784        builder
785            .add_route(
786                Route::new()
787                    .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
788                    .from(Ref::parent())
789                    .to(&test_component),
790            )
791            .await?;
792        builder.init_mutable_config_to_empty(&test_component).await.unwrap();
793        builder
794            .set_config_value(&test_component, "publish_n_trees", opts.publish_n_trees.into())
795            .await
796            .unwrap();
797        let instance = builder.build().await?;
798        Ok(instance)
799    }
800
801    // All selectors in this test select against all tree names, in order to ensure the expected
802    // number of trees are published
803    #[fuchsia::test]
804    async fn inspect_data_for_component() -> Result<(), anyhow::Error> {
805        let instance = start_component(ComponentOptions { publish_n_trees: 1 }).await?;
806        let moniker = format!("realm_builder:{}/test_component", instance.root.child_name());
807        let component_selector = selectors::sanitize_moniker_for_selectors(&moniker);
808        let results = ArchiveReader::inspect()
809            .add_selector(format!("{component_selector}:[...]root"))
810            .snapshot()
811            .await?;
812        assert_eq!(results.len(), 1);
813        assert_data_tree!(results[0].payload.as_ref().unwrap(), root: {
814            "tree-0": 0u64,
815            int: 3u64,
816            "lazy-node": {
817                a: "test",
818                child: {
819                    double: 3.25,
820                },
821            }
822        });
823        // add_selector can take either a String or a Selector.
824        let lazy_property_selector = Selector {
825            component_selector: Some(fdiagnostics::ComponentSelector {
826                moniker_segments: Some(vec![
827                    fdiagnostics::StringSelector::ExactMatch(format!(
828                        "realm_builder:{}",
829                        instance.root.child_name()
830                    )),
831                    fdiagnostics::StringSelector::ExactMatch("test_component".into()),
832                ]),
833                ..Default::default()
834            }),
835            tree_selector: Some(fdiagnostics::TreeSelector::PropertySelector(
836                fdiagnostics::PropertySelector {
837                    node_path: vec![
838                        fdiagnostics::StringSelector::ExactMatch("root".into()),
839                        fdiagnostics::StringSelector::ExactMatch("lazy-node".into()),
840                    ],
841                    target_properties: fdiagnostics::StringSelector::ExactMatch("a".into()),
842                },
843            )),
844            tree_names: Some(fdiagnostics::TreeNames::All(fdiagnostics::All {})),
845            ..Default::default()
846        };
847        let int_property_selector = format!("{component_selector}:[...]root:int");
848        let mut reader = ArchiveReader::inspect();
849        reader.add_selector(int_property_selector).add_selector(lazy_property_selector);
850        let response = reader.snapshot().await?;
851        assert_eq!(response.len(), 1);
852        assert_eq!(response[0].moniker.to_string(), moniker);
853        assert_data_tree!(response[0].payload.as_ref().unwrap(), root: {
854            int: 3u64,
855            "lazy-node": {
856                a: "test"
857            }
858        });
859        Ok(())
860    }
861
862    #[fuchsia::test]
863    async fn select_all_for_moniker() {
864        let instance = start_component(ComponentOptions { publish_n_trees: 1 })
865            .await
866            .expect("component started");
867        let moniker = format!("realm_builder:{}/test_component", instance.root.child_name());
868        let results = ArchiveReader::inspect()
869            .select_all_for_component(moniker)
870            .snapshot()
871            .await
872            .expect("snapshotted");
873        assert_eq!(results.len(), 1);
874        assert_data_tree!(results[0].payload.as_ref().unwrap(), root: {
875            "tree-0": 0u64,
876            int: 3u64,
877            "lazy-node": {
878                a: "test",
879                child: {
880                    double: 3.25,
881                },
882            }
883        });
884    }
885
886    #[fuchsia::test]
887    async fn timeout() -> Result<(), anyhow::Error> {
888        let instance = start_component(ComponentOptions { publish_n_trees: 1 }).await?;
889
890        let mut reader = ArchiveReader::inspect();
891        reader
892            .add_selector(format!(
893                "realm_builder\\:{}/test_component:root",
894                instance.root.child_name()
895            ))
896            .with_timeout(zx::MonotonicDuration::from_nanos(0));
897        let result = reader.snapshot().await;
898        assert!(result.unwrap().is_empty());
899        Ok(())
900    }
901
902    #[fuchsia::test]
903    async fn component_selector() {
904        let selector = ComponentSelector::new(vec!["a".to_string()]);
905        assert_eq!(selector.moniker_str(), "a");
906        let arguments: Vec<_> = selector.to_selector_arguments().collect();
907        assert_eq!(arguments, vec![SelectorArgument::RawSelector("a:root".to_string())]);
908
909        let selector =
910            ComponentSelector::new(vec!["b".to_string(), "c".to_string(), "a".to_string()]);
911        assert_eq!(selector.moniker_str(), "b/c/a");
912
913        let selector = selector.with_tree_selector("root/b/c:d").with_tree_selector("root/e:f");
914        let arguments: Vec<_> = selector.to_selector_arguments().collect();
915        assert_eq!(
916            arguments,
917            vec![
918                SelectorArgument::RawSelector("b/c/a:root/b/c:d".into()),
919                SelectorArgument::RawSelector("b/c/a:root/e:f".into()),
920            ]
921        );
922    }
923
924    #[fuchsia::test]
925    async fn custom_archive() {
926        let proxy = spawn_fake_archive(serde_json::json!({
927            "moniker": "moniker",
928            "version": 1,
929            "data_source": "Inspect",
930            "metadata": {
931              "component_url": "component-url",
932              "timestamp": 0,
933              "filename": "filename",
934            },
935            "payload": {
936                "root": {
937                    "x": 1,
938                }
939            }
940        }));
941        let result =
942            ArchiveReader::inspect().with_archive(proxy).snapshot().await.expect("got result");
943        assert_eq!(result.len(), 1);
944        assert_data_tree!(result[0].payload.as_ref().unwrap(), root: { x: 1u64 });
945    }
946
947    #[fuchsia::test]
948    async fn handles_lists_correctly_on_snapshot_raw() {
949        let value = serde_json::json!({
950            "moniker": "moniker",
951            "version": 1,
952            "data_source": "Inspect",
953            "metadata": {
954            "component_url": "component-url",
955            "timestamp": 0,
956            "filename": "filename",
957            },
958            "payload": {
959                "root": {
960                    "x": 1,
961                }
962            }
963        });
964        let proxy = spawn_fake_archive(serde_json::json!([value.clone()]));
965        let mut reader = ArchiveReader::inspect();
966        reader.with_archive(proxy);
967        let json_result = reader.snapshot_raw::<serde_json::Value>().await.expect("got result");
968        match json_result {
969            serde_json::Value::Array(values) => {
970                assert_eq!(values.len(), 1);
971                assert_eq!(values[0], value);
972            }
973            result => panic!("unexpected result: {:?}", result),
974        }
975        let cbor_result = reader.snapshot_raw::<ciborium::Value>().await.expect("got result");
976        match cbor_result {
977            ciborium::Value::Array(values) => {
978                assert_eq!(values.len(), 1);
979                let json_result =
980                    values[0].deserialized::<serde_json::Value>().expect("convert to json");
981                assert_eq!(json_result, value);
982            }
983            result => panic!("unexpected result: {:?}", result),
984        }
985    }
986
987    #[fuchsia::test(logging = false)]
988    async fn snapshot_then_subscribe() {
989        let (_instance, publisher, reader) = init_isolated_logging().await;
990        let (mut stream, _errors) =
991            reader.snapshot_then_subscribe().expect("subscribed to logs").split_streams();
992        log::set_boxed_logger(Box::new(publisher)).unwrap();
993        info!("hello from test");
994        error!("error from test");
995        let log = stream.next().await.unwrap();
996        assert_eq!(log.msg().unwrap(), "hello from test");
997        let log = stream.next().await.unwrap();
998        assert_eq!(log.msg().unwrap(), "error from test");
999    }
1000
1001    #[fuchsia::test]
1002    async fn read_many_trees_with_filtering() {
1003        let instance = start_component(ComponentOptions { publish_n_trees: 2 })
1004            .await
1005            .expect("component started");
1006        let selector = format!(
1007            "realm_builder\\:{}/test_component:[name=tree-0]root",
1008            instance.root.child_name()
1009        );
1010        let results = ArchiveReader::inspect()
1011            .add_selector(selector)
1012            // Only one schema since empty schemas are filtered out
1013            .with_minimum_schema_count(1)
1014            .snapshot()
1015            .await
1016            .expect("snapshotted");
1017        assert_matches!(results.iter().find(|v| v.metadata.name.as_ref() == "tree-1"), None);
1018        let should_have_data =
1019            results.into_iter().find(|v| v.metadata.name.as_ref() == "tree-0").unwrap();
1020        assert_data_tree!(should_have_data.payload.unwrap(), root: contains {
1021            "tree-0": 0u64,
1022        });
1023    }
1024
1025    fn spawn_fake_archive(data_to_send: serde_json::Value) -> fdiagnostics::ArchiveAccessorProxy {
1026        let (proxy, mut stream) =
1027            fidl::endpoints::create_proxy_and_stream::<fdiagnostics::ArchiveAccessorMarker>();
1028        fasync::Task::spawn(async move {
1029            while let Some(request) = stream.try_next().await.expect("stream request") {
1030                match request {
1031                    fdiagnostics::ArchiveAccessorRequest::StreamDiagnostics {
1032                        result_stream,
1033                        ..
1034                    } => {
1035                        let data = data_to_send.clone();
1036                        fasync::Task::spawn(handle_batch_iterator(data, result_stream)).detach();
1037                    }
1038                    fdiagnostics::ArchiveAccessorRequest::WaitForReady { responder, .. } => {
1039                        let _ = responder.send();
1040                    }
1041                    fdiagnostics::ArchiveAccessorRequest::_UnknownMethod { .. } => {
1042                        unreachable!("Unexpected method call");
1043                    }
1044                }
1045            }
1046        })
1047        .detach();
1048        proxy
1049    }
1050
1051    async fn handle_batch_iterator(
1052        data: serde_json::Value,
1053        result_stream: ServerEnd<fdiagnostics::BatchIteratorMarker>,
1054    ) {
1055        let mut called = false;
1056        let mut stream = result_stream.into_stream();
1057        while let Some(req) = stream.try_next().await.expect("stream request") {
1058            match req {
1059                fdiagnostics::BatchIteratorRequest::WaitForReady { responder } => {
1060                    let _ = responder.send();
1061                }
1062                fdiagnostics::BatchIteratorRequest::GetNext { responder } => {
1063                    if called {
1064                        responder.send(Ok(Vec::new())).expect("send response");
1065                        continue;
1066                    }
1067                    called = true;
1068                    let content = serde_json::to_string_pretty(&data).expect("json pretty");
1069                    let vmo_size = content.len() as u64;
1070                    let vmo = zx::Vmo::create(vmo_size).expect("create vmo");
1071                    vmo.write(content.as_bytes(), 0).expect("write vmo");
1072                    let buffer = fidl_fuchsia_mem::Buffer { vmo, size: vmo_size };
1073                    responder
1074                        .send(Ok(vec![fdiagnostics::FormattedContent::Json(buffer)]))
1075                        .expect("send response");
1076                }
1077                fdiagnostics::BatchIteratorRequest::_UnknownMethod { .. } => {
1078                    unreachable!("Unexpected method call");
1079                }
1080            }
1081        }
1082    }
1083
1084    async fn create_realm() -> RealmBuilder {
1085        let builder = RealmBuilder::new().await.expect("create realm builder");
1086        let archivist = builder
1087            .add_child("archivist", "#meta/archivist-for-embedding.cm", ChildOptions::new().eager())
1088            .await
1089            .expect("add child archivist");
1090        builder
1091            .add_route(
1092                Route::new()
1093                    .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
1094                    .capability(
1095                        Capability::protocol_by_name("fuchsia.tracing.provider.Registry")
1096                            .optional(),
1097                    )
1098                    .capability(Capability::event_stream("stopped"))
1099                    .capability(Capability::event_stream("capability_requested"))
1100                    .from(Ref::parent())
1101                    .to(&archivist),
1102            )
1103            .await
1104            .expect("added routes from parent to archivist");
1105        builder
1106            .add_route(
1107                Route::new()
1108                    .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
1109                    .from(&archivist)
1110                    .to(Ref::parent()),
1111            )
1112            .await
1113            .expect("routed LogSink from archivist to parent");
1114        builder
1115            .add_route(
1116                Route::new()
1117                    .capability(Capability::protocol_by_name("fuchsia.diagnostics.ArchiveAccessor"))
1118                    .from_dictionary("diagnostics-accessors")
1119                    .from(&archivist)
1120                    .to(Ref::parent()),
1121            )
1122            .await
1123            .expect("routed ArchiveAccessor from archivist to parent");
1124        builder
1125    }
1126
1127    async fn init_isolated_logging() -> (RealmInstance, Publisher, ArchiveReader<Logs>) {
1128        let instance = create_realm().await.build().await.unwrap();
1129        let log_sink_proxy =
1130            instance.root.connect_to_protocol_at_exposed_dir::<flogger::LogSinkMarker>().unwrap();
1131        let accessor_proxy = instance
1132            .root
1133            .connect_to_protocol_at_exposed_dir::<fdiagnostics::ArchiveAccessorMarker>()
1134            .unwrap();
1135        let mut reader = ArchiveReader::logs();
1136        reader.with_archive(accessor_proxy);
1137        let options = PublisherOptions::default()
1138            .wait_for_initial_interest(false)
1139            .use_log_sink(log_sink_proxy);
1140        let publisher = Publisher::new(options).unwrap();
1141        (instance, publisher, reader)
1142    }
1143
1144    #[fuchsia::test]
1145    fn retry_config_behavior() {
1146        let config = RetryConfig::MinSchemaCount(1);
1147        let got = 0;
1148
1149        assert!(config.should_retry(got));
1150
1151        let config = RetryConfig::MinSchemaCount(1);
1152        let got = 1;
1153
1154        assert!(!config.should_retry(got));
1155
1156        let config = RetryConfig::MinSchemaCount(1);
1157        let got = 2;
1158
1159        assert!(!config.should_retry(got));
1160
1161        let config = RetryConfig::MinSchemaCount(0);
1162        let got = 1;
1163
1164        assert!(!config.should_retry(got));
1165
1166        let config = RetryConfig::always();
1167        let got = 0;
1168
1169        assert!(config.should_retry(got));
1170
1171        let config = RetryConfig::never();
1172        let got = 0;
1173
1174        assert!(!config.should_retry(got));
1175    }
1176}