Skip to main content

diagnostics_reader/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![deny(missing_docs)]
6
7//! A library for reading Inspect and Log data from
8//! the ArchiveAccessor FIDL protocol.
9
10use async_stream::stream;
11use diagnostics_data::{DiagnosticsData, LogsData};
12#[cfg(fuchsia_api_level_less_than = "HEAD")]
13use diagnostics_message as _;
14#[cfg(fuchsia_api_level_at_least = "HEAD")]
15use diagnostics_message::{RustMessageFormatter, from_extended_record};
16use fidl_fuchsia_diagnostics::{
17    ArchiveAccessorMarker, ArchiveAccessorProxy, BatchIteratorMarker, BatchIteratorProxy,
18    ClientSelectorConfiguration, Format, FormattedContent, PerformanceConfiguration, ReaderError,
19    Selector, SelectorArgument, StreamMode, StreamParameters,
20};
21use fuchsia_async::{self as fasync, DurationExt, TimeoutExt};
22use fuchsia_component::client;
23#[cfg(fuchsia_api_level_at_least = "HEAD")]
24use fuchsia_sync::Mutex;
25use futures::channel::mpsc;
26use futures::prelude::*;
27use futures::sink::SinkExt;
28use futures::stream::FusedStream;
29use pin_project::pin_project;
30use serde::Deserialize;
31use std::future::ready;
32use std::marker::PhantomData;
33use std::pin::Pin;
34use std::sync::Arc;
35use std::task::{Context, Poll};
36use thiserror::Error;
37use zx::{self as zx, MonotonicDuration};
38
39/// Alias for ArchiveReader<Logs>. Used for reading logs.
40pub type LogsArchiveReader = ArchiveReader<Logs>;
41
42/// Alias for ArchiveReader<Inspect>. Used for reading inspect.
43pub type InspectArchiveReader = ArchiveReader<Inspect>;
44
45pub use diagnostics_data::{Data, Inspect, Logs, Severity};
46pub use diagnostics_hierarchy::{DiagnosticsHierarchy, Property, hierarchy};
47
48const RETRY_DELAY_MS: i64 = 300;
49
50#[cfg(fuchsia_api_level_at_least = "HEAD")]
51const FORMAT: Format = Format::Cbor;
52#[cfg(fuchsia_api_level_less_than = "HEAD")]
53const FORMAT: Format = Format::Json;
54
55/// Errors that this library can return
56#[derive(Debug, Error)]
57pub enum Error {
58    /// Failed to connect to the archive accessor
59    #[error("Failed to connect to the archive accessor")]
60    ConnectToArchive(#[source] anyhow::Error),
61
62    /// Failed to create the BatchIterator channel ends
63    #[error("Failed to create the BatchIterator channel ends")]
64    CreateIteratorProxy(#[source] fidl::Error),
65
66    /// Failed to stream diagnostics from the accessor
67    #[error("Failed to stream diagnostics from the accessor")]
68    StreamDiagnostics(#[source] fidl::Error),
69
70    /// Failed to call iterator server
71    #[error("Failed to call iterator server")]
72    GetNextCall(#[source] fidl::Error),
73
74    /// Received error from the GetNext response
75    #[error("Received error from the GetNext response: {0:?}")]
76    GetNextReaderError(ReaderError),
77
78    /// Failed to read json received
79    #[error("Failed to read json received")]
80    ReadJson(#[source] serde_json::Error),
81
82    /// Failed to read cbor received
83    #[cfg(fuchsia_api_level_at_least = "HEAD")]
84    #[error("Failed to read cbor received")]
85    ReadCbor(#[source] anyhow::Error),
86
87    /// Failed to parse the diagnostics data from the json received
88    #[error("Failed to parse the diagnostics data from the json received")]
89    ParseDiagnosticsData(#[source] serde_json::Error),
90
91    /// Failed to read vmo from the response
92    #[error("Failed to read vmo from the response")]
93    ReadVmo(#[source] zx::Status),
94}
95
96/// An inspect tree selector for a component.
97pub struct ComponentSelector {
98    moniker: Vec<String>,
99    tree_selectors: Vec<String>,
100}
101
102impl ComponentSelector {
103    /// Create a new component event selector.
104    /// By default it will select the whole tree unless tree selectors are provided.
105    /// `moniker` is the realm path relative to the realm of the running component plus the
106    /// component name. For example: [a, b, component].
107    pub fn new(moniker: Vec<String>) -> Self {
108        Self { moniker, tree_selectors: Vec::new() }
109    }
110
111    /// Select a section of the inspect tree.
112    pub fn with_tree_selector(mut self, tree_selector: impl Into<String>) -> Self {
113        self.tree_selectors.push(tree_selector.into());
114        self
115    }
116
117    fn moniker_str(&self) -> String {
118        self.moniker.join("/")
119    }
120}
121
122/// Trait used for things that can be converted to selector arguments.
123pub trait ToSelectorArguments {
124    /// Converts this to selector arguments.
125    fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>>;
126}
127
128/// Trait used for things that can be converted to component selector arguments.
129pub trait ToComponentSelectorArguments {
130    /// Converts this to selector arguments.
131    fn to_component_selector_arguments(self) -> ComponentSelector;
132}
133
134impl ToComponentSelectorArguments for &str {
135    fn to_component_selector_arguments(self) -> ComponentSelector {
136        if self.contains("\\:") {
137            // String is already escaped, don't escape it.
138            ComponentSelector::new(self.split("/").map(|value| value.to_string()).collect())
139        } else {
140            // String isn't escaped, escape it
141            ComponentSelector::new(
142                selectors::sanitize_moniker_for_selectors(self)
143                    .split("/")
144                    .map(|value| value.to_string())
145                    .collect(),
146            )
147            .with_tree_selector("[...]root")
148        }
149    }
150}
151
152impl ToComponentSelectorArguments for String {
153    fn to_component_selector_arguments(self) -> ComponentSelector {
154        self.as_str().to_component_selector_arguments()
155    }
156}
157
158impl ToComponentSelectorArguments for ComponentSelector {
159    fn to_component_selector_arguments(self) -> ComponentSelector {
160        self
161    }
162}
163
164impl ToSelectorArguments for String {
165    fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
166        Box::new([SelectorArgument::RawSelector(self)].into_iter())
167    }
168}
169
170impl ToSelectorArguments for &str {
171    fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
172        Box::new([SelectorArgument::RawSelector(self.to_string())].into_iter())
173    }
174}
175
176impl ToSelectorArguments for ComponentSelector {
177    fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
178        let moniker = self.moniker_str();
179        // If not tree selectors were provided, select the full tree.
180        if self.tree_selectors.is_empty() {
181            Box::new([SelectorArgument::RawSelector(format!("{moniker}:root"))].into_iter())
182        } else {
183            Box::new(
184                self.tree_selectors
185                    .into_iter()
186                    .map(move |s| SelectorArgument::RawSelector(format!("{moniker}:{s}"))),
187            )
188        }
189    }
190}
191
192impl ToSelectorArguments for Selector {
193    fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
194        Box::new([SelectorArgument::StructuredSelector(self)].into_iter())
195    }
196}
197
198/// Before unsealing this, consider whether your code belongs in this file.
199pub trait SerializableValue: private::Sealed {
200    /// The Format of this SerializableValue. Either Logs or Inspect.
201    const FORMAT_OF_VALUE: Format;
202}
203
204/// Trait used to verify that a JSON payload has a valid diagnostics payload.
205pub trait CheckResponse: private::Sealed {
206    /// Returns true if the response has a valid payload.
207    fn has_payload(&self) -> bool;
208}
209
210// The "sealed trait" pattern.
211//
212// https://rust-lang.github.io/api-guidelines/future-proofing.html
213mod private {
214    pub trait Sealed {}
215}
216impl private::Sealed for serde_json::Value {}
217impl private::Sealed for ciborium::Value {}
218impl<D: DiagnosticsData> private::Sealed for Data<D> {}
219
220impl<D: DiagnosticsData> CheckResponse for Data<D> {
221    fn has_payload(&self) -> bool {
222        self.payload.is_some()
223    }
224}
225
226impl SerializableValue for serde_json::Value {
227    const FORMAT_OF_VALUE: Format = Format::Json;
228}
229
230impl CheckResponse for serde_json::Value {
231    fn has_payload(&self) -> bool {
232        match self {
233            serde_json::Value::Object(obj) => {
234                obj.get("payload").map(|p| !matches!(p, serde_json::Value::Null)).is_some()
235            }
236            _ => false,
237        }
238    }
239}
240
241#[cfg(fuchsia_api_level_at_least = "HEAD")]
242impl SerializableValue for ciborium::Value {
243    const FORMAT_OF_VALUE: Format = Format::Cbor;
244}
245
246impl CheckResponse for ciborium::Value {
247    fn has_payload(&self) -> bool {
248        match self {
249            ciborium::Value::Map(m) => {
250                let payload_key = ciborium::Value::Text("payload".into());
251                m.iter().any(|(key, _)| *key == payload_key)
252            }
253            _ => false,
254        }
255    }
256}
257
258/// Retry configuration for ArchiveReader
259#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
260pub enum RetryConfig {
261    /// The minimum schema count required for a successful read.
262    /// This guarantees that a read will contain at least MinSchemaCount
263    /// results.
264    MinSchemaCount(usize),
265}
266
267impl RetryConfig {
268    /// Always retry
269    pub fn always() -> Self {
270        Self::MinSchemaCount(1)
271    }
272
273    /// Never retry
274    pub fn never() -> Self {
275        Self::MinSchemaCount(0)
276    }
277
278    /// Retry result_count times
279    fn should_retry(&self, result_count: usize) -> bool {
280        match self {
281            Self::MinSchemaCount(min) => *min > result_count,
282        }
283    }
284}
285
286/// A trait representing a type of diagnostics data.
287pub trait DiagnosticsDataType: private::Sealed {}
288
289impl private::Sealed for Logs {}
290
291impl private::Sealed for Inspect {}
292
293impl DiagnosticsDataType for Logs {}
294
295impl DiagnosticsDataType for Inspect {}
296
297/// Utility for reading inspect data of a running component using the injected Archive
298/// Reader service.
299pub struct ArchiveReader<T> {
300    archive: Option<ArchiveAccessorProxy>,
301    selectors: Vec<SelectorArgument>,
302    retry_config: RetryConfig,
303    timeout: Option<MonotonicDuration>,
304    batch_retrieval_timeout_seconds: Option<i64>,
305    max_aggregated_content_size_bytes: Option<u64>,
306    format: Option<Format>,
307    _phantom: PhantomData<T>,
308}
309
310impl<T: DiagnosticsDataType> ArchiveReader<T> {
311    /// Initializes the ArchiveReader with a custom connection to an ArchiveAccessor.
312    /// By default, the connection will be initialized by connecting to
313    /// fuchsia.diagnostics.ArchiveAccessor
314    pub fn with_archive(&mut self, archive: ArchiveAccessorProxy) -> &mut Self {
315        self.archive = Some(archive);
316        self
317    }
318
319    /// Sets the minimum number of schemas expected in a result in order for the
320    /// result to be considered a success.
321    pub fn with_minimum_schema_count(&mut self, minimum_schema_count: usize) -> &mut Self {
322        self.retry_config = RetryConfig::MinSchemaCount(minimum_schema_count);
323        self
324    }
325
326    /// Sets a custom retry configuration. By default we always retry.
327    pub fn retry(&mut self, config: RetryConfig) -> &mut Self {
328        self.retry_config = config;
329        self
330    }
331
332    /// Sets the maximum time to wait for a response from the Archive.
333    /// Do not use in tests unless timeout is the expected behavior.
334    pub fn with_timeout(&mut self, duration: MonotonicDuration) -> &mut Self {
335        self.timeout = Some(duration);
336        self
337    }
338
339    /// Filters logs for a specific component or component selector.
340    /// If string input, the string may be either a component selector string
341    /// or a moniker, or a ComponentSelector may be passed directly.
342    pub fn select_all_for_component(
343        &mut self,
344        component: impl ToComponentSelectorArguments,
345    ) -> &mut Self {
346        self.selectors.extend(component.to_component_selector_arguments().to_selector_arguments());
347        self
348    }
349
350    /// Connects to the ArchiveAccessor and returns data matching provided selectors.
351    async fn snapshot_shared<D>(&self, format: Format) -> Result<Vec<Data<D>>, Error>
352    where
353        D: DiagnosticsData + 'static,
354    {
355        let data_future = self.snapshot_inner::<D, Data<D>>(format);
356        let data = match self.timeout {
357            Some(timeout) => data_future.on_timeout(timeout.after_now(), || Ok(Vec::new())).await?,
358            None => data_future.await?,
359        };
360        Ok(data)
361    }
362
363    async fn snapshot_inner<D, Y>(&self, format: Format) -> Result<Vec<Y>, Error>
364    where
365        D: DiagnosticsData,
366        Y: for<'a> Deserialize<'a> + CheckResponse + Send + 'static,
367    {
368        loop {
369            let iterator = self.batch_iterator::<D>(StreamMode::Snapshot, format)?;
370            let result = drain_batch_iterator::<Y>(Arc::new(iterator))
371                .filter_map(|value| ready(value.ok()))
372                .collect::<Vec<_>>()
373                .await;
374
375            if self.retry_config.should_retry(result.len()) {
376                fasync::Timer::new(fasync::MonotonicInstant::after(
377                    zx::MonotonicDuration::from_millis(RETRY_DELAY_MS),
378                ))
379                .await;
380            } else {
381                return Ok(result);
382            }
383        }
384    }
385
386    fn batch_iterator<D>(
387        &self,
388        mode: StreamMode,
389        format: Format,
390    ) -> Result<BatchIteratorProxy, Error>
391    where
392        D: DiagnosticsData,
393    {
394        let archive = match &self.archive {
395            Some(archive) => archive.clone(),
396            None => client::connect_to_protocol::<ArchiveAccessorMarker>()
397                .map_err(Error::ConnectToArchive)?,
398        };
399
400        let (iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
401        let stream_parameters = StreamParameters {
402            stream_mode: Some(mode),
403            data_type: Some(D::DATA_TYPE),
404            format: Some(format),
405            client_selector_configuration: if self.selectors.is_empty() {
406                Some(ClientSelectorConfiguration::SelectAll(true))
407            } else {
408                Some(ClientSelectorConfiguration::Selectors(self.selectors.to_vec()))
409            },
410            performance_configuration: Some(PerformanceConfiguration {
411                max_aggregate_content_size_bytes: self.max_aggregated_content_size_bytes,
412                batch_retrieval_timeout_seconds: self.batch_retrieval_timeout_seconds,
413                ..Default::default()
414            }),
415            ..Default::default()
416        };
417
418        archive
419            .stream_diagnostics(&stream_parameters, server_end)
420            .map_err(Error::StreamDiagnostics)?;
421        Ok(iterator)
422    }
423}
424
425impl ArchiveReader<Logs> {
426    /// Creates an ArchiveReader for reading logs
427    pub fn logs() -> Self {
428        ArchiveReader::<Logs> {
429            timeout: None,
430            format: None,
431            selectors: vec![],
432            retry_config: RetryConfig::always(),
433            archive: None,
434            batch_retrieval_timeout_seconds: None,
435            max_aggregated_content_size_bytes: None,
436            _phantom: PhantomData,
437        }
438    }
439
440    #[doc(hidden)]
441    pub fn with_format(&mut self, format: Format) -> &mut Self {
442        self.format = Some(format);
443        self
444    }
445
446    #[inline]
447    fn format(&self) -> Format {
448        match self.format {
449            Some(f) => f,
450            None => {
451                #[cfg(fuchsia_api_level_at_least = "HEAD")]
452                let ret = Format::LegacyFxt;
453                #[cfg(fuchsia_api_level_less_than = "HEAD")]
454                let ret = Format::Json;
455                ret
456            }
457        }
458    }
459
460    /// Connects to the ArchiveAccessor and returns data matching provided selectors.
461    pub async fn snapshot(&self) -> Result<Vec<Data<Logs>>, Error> {
462        loop {
463            let iterator = self.batch_iterator::<Logs>(StreamMode::Snapshot, self.format())?;
464            let result = drain_batch_iterator_for_logs(Arc::new(iterator), Some(self.format()))
465                .filter_map(|value| ready(value.ok()))
466                .collect::<Vec<_>>()
467                .await;
468            if self.retry_config.should_retry(result.len()) {
469                fasync::Timer::new(fasync::MonotonicInstant::after(
470                    zx::MonotonicDuration::from_millis(RETRY_DELAY_MS),
471                ))
472                .await;
473            } else {
474                return Ok(result);
475            }
476        }
477    }
478
479    /// Connects to the ArchiveAccessor and returns a stream of data containing a snapshot of the
480    /// current buffer in the Archivist as well as new data that arrives.
481    pub fn snapshot_then_subscribe(&self) -> Result<Subscription, Error> {
482        let iterator =
483            self.batch_iterator::<Logs>(StreamMode::SnapshotThenSubscribe, self.format())?;
484        Ok(Subscription::new_with_format(iterator, self.format()))
485    }
486}
487
488impl ArchiveReader<Inspect> {
489    /// Creates an ArchiveReader for reading Inspect data.
490    pub fn inspect() -> Self {
491        ArchiveReader::<Inspect> {
492            timeout: None,
493            format: None,
494            selectors: vec![],
495            retry_config: RetryConfig::always(),
496            archive: None,
497            batch_retrieval_timeout_seconds: None,
498            max_aggregated_content_size_bytes: None,
499            _phantom: PhantomData,
500        }
501    }
502
503    /// Set the maximum time to wait for a wait for a single component
504    /// to have its diagnostics data "pumped".
505    pub fn with_batch_retrieval_timeout_seconds(&mut self, timeout: i64) -> &mut Self {
506        self.batch_retrieval_timeout_seconds = Some(timeout);
507        self
508    }
509
510    /// Sets the total number of bytes allowed in a single VMO read.
511    pub fn with_aggregated_result_bytes_limit(&mut self, limit_bytes: u64) -> &mut Self {
512        self.max_aggregated_content_size_bytes = Some(limit_bytes);
513        self
514    }
515
516    /// Connects to the ArchiveAccessor and returns inspect data matching provided selectors.
517    /// Returns the raw json for each hierarchy fetched. This is used for CTF compatibility
518    /// tests (which test various implementation details of the JSON format),
519    /// and use beyond such tests is discouraged.
520    pub async fn snapshot_raw<T>(&self) -> Result<T, Error>
521    where
522        T: for<'a> Deserialize<'a>
523            + SerializableValue
524            + From<Vec<T>>
525            + CheckResponse
526            + 'static
527            + Send,
528    {
529        let data_future = self.snapshot_inner::<Inspect, T>(T::FORMAT_OF_VALUE);
530        let data = match self.timeout {
531            Some(timeout) => data_future.on_timeout(timeout.after_now(), || Ok(Vec::new())).await?,
532            None => data_future.await?,
533        };
534        Ok(T::from(data))
535    }
536
537    /// Adds selectors used for performing filtering inspect hierarchies.
538    /// This may be called multiple times to add additional selectors.
539    pub fn add_selectors<T, S>(&mut self, selectors: T) -> &mut Self
540    where
541        T: Iterator<Item = S>,
542        S: ToSelectorArguments,
543    {
544        for selector in selectors {
545            self.add_selector(selector);
546        }
547        self
548    }
549
550    /// Requests a single component tree (or sub-tree).
551    pub fn add_selector(&mut self, selector: impl ToSelectorArguments) -> &mut Self {
552        self.selectors.extend(selector.to_selector_arguments());
553        self
554    }
555
556    /// Sets the format to use when reading inspect data.
557    pub fn with_format(&mut self, format: Format) -> &mut Self {
558        self.format = Some(format);
559        self
560    }
561
562    #[inline]
563    fn format(&self) -> Format {
564        match self.format {
565            Some(f) => f,
566            None => FORMAT,
567        }
568    }
569
570    /// Connects to the ArchiveAccessor and returns data matching provided selectors.
571    pub async fn snapshot(&self) -> Result<Vec<Data<Inspect>>, Error> {
572        self.snapshot_shared::<Inspect>(self.format()).await
573    }
574}
575
576#[derive(Debug, Deserialize)]
577#[serde(untagged)]
578enum OneOrMany<T> {
579    Many(Vec<T>),
580    One(T),
581}
582
583fn stream_batch<T>(
584    iterator: Arc<BatchIteratorProxy>,
585    process_content: impl Fn(FormattedContent) -> Result<OneOrMany<T>, Error>,
586) -> impl Stream<Item = Result<T, Error>>
587where
588    T: for<'a> Deserialize<'a> + Send + 'static,
589{
590    stream! {
591        loop {
592            let next_batch = iterator
593                .get_next()
594                .await
595                .map_err(Error::GetNextCall)?
596                .map_err(Error::GetNextReaderError)?;
597            if next_batch.is_empty() {
598                // End of stream
599                return;
600            }
601            for formatted_content in next_batch {
602                let output = process_content(formatted_content)?;
603                match output {
604                    OneOrMany::One(data) => yield Ok(data),
605                    OneOrMany::Many(datas) => {
606                        for data in datas {
607                            yield Ok(data);
608                        }
609                    }
610                }
611            }
612        }
613    }
614}
615
616/// Drain a batch iterator.
617pub fn drain_batch_iterator<T>(
618    iterator: Arc<BatchIteratorProxy>,
619) -> impl Stream<Item = Result<T, Error>>
620where
621    T: for<'a> Deserialize<'a> + Send + 'static,
622{
623    stream_batch(iterator, |formatted_content| match formatted_content {
624        FormattedContent::Json(data) => {
625            let mut buf = vec![0; data.size as usize];
626            data.vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
627            serde_json::from_slice(&buf).map_err(Error::ReadJson)
628        }
629        #[cfg(fuchsia_api_level_at_least = "HEAD")]
630        FormattedContent::Cbor(vmo) => {
631            let mut buf = vec![0; vmo.get_content_size().expect("Always returns Ok") as usize];
632            vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
633            Ok(ciborium::from_reader(buf.as_slice()).map_err(|err| Error::ReadCbor(err.into()))?)
634        }
635        #[cfg(fuchsia_api_level_at_least = "HEAD")]
636        FormattedContent::Fxt(_) => unreachable!("We never expect FXT for Inspect"),
637        FormattedContent::__SourceBreaking { unknown_ordinal: _ } => {
638            unreachable!("Received unrecognized FIDL message")
639        }
640    })
641}
642
643fn drain_batch_iterator_for_logs(
644    iterator: Arc<BatchIteratorProxy>,
645    _format: Option<Format>,
646) -> impl Stream<Item = Result<LogsData, Error>> {
647    #[cfg(fuchsia_api_level_at_least = "HEAD")]
648    let parser = Arc::new(Mutex::new(diagnostics_message::MessageParser::default()));
649    stream_batch::<LogsData>(iterator, move |formatted_content| match formatted_content {
650        FormattedContent::Json(data) => {
651            let mut buf = vec![0; data.size as usize];
652            data.vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
653            serde_json::from_slice(&buf).map_err(Error::ReadJson)
654        }
655        #[cfg(fuchsia_api_level_at_least = "HEAD")]
656        FormattedContent::Fxt(vmo) => {
657            let mut buf = vec![0; vmo.get_content_size().expect("Always returns Ok") as usize];
658            vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
659            let mut current_slice: &[u8] = &buf;
660            let mut items = vec![];
661            let mut parser = parser.lock();
662
663            while !current_slice.is_empty() {
664                if _format == Some(Format::Fxt) {
665                    match parser.parse_next(current_slice, RustMessageFormatter) {
666                        Ok((maybe_data, remaining)) => {
667                            assert!(remaining.len() < current_slice.len(), "Parser must advance");
668                            if let Some(data) = maybe_data {
669                                items.push(data);
670                            }
671                            current_slice = remaining;
672                        }
673                        Err(_) => {
674                            // This can happen if we are reading a truncated record.
675                            // Stop parsing this buffer.
676                            break;
677                        }
678                    }
679                } else {
680                    match from_extended_record(current_slice) {
681                        Ok((data, remaining)) => {
682                            items.push(data);
683                            current_slice = remaining;
684                        }
685                        Err(_) => {
686                            // This can happen if we are reading a truncated record.
687                            // Stop parsing this buffer.
688                            break;
689                        }
690                    }
691                }
692            }
693            Ok(OneOrMany::Many(items))
694        }
695        #[cfg(fuchsia_api_level_at_least = "HEAD")]
696        FormattedContent::Cbor(_) => unreachable!("We never expect CBOR"),
697        FormattedContent::__SourceBreaking { unknown_ordinal: _ } => {
698            unreachable!("Received unrecognized FIDL message")
699        }
700    })
701}
702
703/// A subscription used for reading logs.
704#[pin_project]
705pub struct Subscription {
706    #[pin]
707    recv: Pin<Box<dyn FusedStream<Item = Result<LogsData, Error>> + Send>>,
708    iterator: Arc<BatchIteratorProxy>,
709}
710
711const DATA_CHANNEL_SIZE: usize = 32;
712const ERROR_CHANNEL_SIZE: usize = 2;
713
714impl Subscription {
715    /// Creates a new subscription stream to a batch iterator.
716    /// The stream will return diagnostics data structures.
717    pub fn new(iterator: BatchIteratorProxy) -> Self {
718        let iterator = Arc::new(iterator);
719        Subscription {
720            recv: Box::pin(drain_batch_iterator_for_logs(iterator.clone(), None).fuse()),
721            iterator,
722        }
723    }
724
725    /// Creates a new subscription stream to a batch iterator.
726    /// The stream will return diagnostics data structures.
727    pub fn new_with_format(iterator: BatchIteratorProxy, format: Format) -> Self {
728        let iterator = Arc::new(iterator);
729        Subscription {
730            recv: Box::pin(drain_batch_iterator_for_logs(iterator.clone(), Some(format)).fuse()),
731            iterator,
732        }
733    }
734
735    /// Wait for the connection with the server to be established.
736    pub async fn wait_for_ready(&self) {
737        self.iterator.wait_for_ready().await.expect("doesn't disconnect");
738    }
739
740    /// Splits the subscription into two separate streams: results and errors.
741    pub fn split_streams(mut self) -> (SubscriptionResultsStream<LogsData>, mpsc::Receiver<Error>) {
742        let (mut errors_sender, errors) = mpsc::channel(ERROR_CHANNEL_SIZE);
743        let (mut results_sender, recv) = mpsc::channel(DATA_CHANNEL_SIZE);
744        let _drain_task = fasync::Task::spawn(async move {
745            while let Some(result) = self.next().await {
746                match result {
747                    Ok(value) => results_sender.send(value).await.ok(),
748                    Err(e) => errors_sender.send(e).await.ok(),
749                };
750            }
751        });
752        (SubscriptionResultsStream { recv, _drain_task }, errors)
753    }
754}
755
756impl Stream for Subscription {
757    type Item = Result<LogsData, Error>;
758
759    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
760        let this = self.project();
761        this.recv.poll_next(cx)
762    }
763}
764
765impl FusedStream for Subscription {
766    fn is_terminated(&self) -> bool {
767        self.recv.is_terminated()
768    }
769}
770
771/// A stream for reading diagnostics data
772#[pin_project]
773pub struct SubscriptionResultsStream<T> {
774    #[pin]
775    recv: mpsc::Receiver<T>,
776    _drain_task: fasync::Task<()>,
777}
778
779impl<T> Stream for SubscriptionResultsStream<T>
780where
781    T: for<'a> Deserialize<'a>,
782{
783    type Item = T;
784
785    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
786        let this = self.project();
787        this.recv.poll_next(cx)
788    }
789}
790
791impl<T> FusedStream for SubscriptionResultsStream<T>
792where
793    T: for<'a> Deserialize<'a>,
794{
795    fn is_terminated(&self) -> bool {
796        self.recv.is_terminated()
797    }
798}
799
800#[cfg(test)]
801mod tests {
802    use super::*;
803    use assert_matches::assert_matches;
804    use diagnostics_assertions::assert_data_tree;
805    use diagnostics_log::{Publisher, PublisherOptions};
806    use fidl::endpoints::ServerEnd;
807    use fidl_fuchsia_diagnostics as fdiagnostics;
808    use fuchsia_component_test::{
809        Capability, ChildOptions, RealmBuilder, RealmInstance, Ref, Route,
810    };
811    use futures::TryStreamExt;
812    use log::{error, info};
813
814    const TEST_COMPONENT_URL: &str = "#meta/inspect_test_component.cm";
815
816    struct ComponentOptions {
817        publish_n_trees: u64,
818    }
819
820    async fn start_component(opts: ComponentOptions) -> Result<RealmInstance, anyhow::Error> {
821        let builder = RealmBuilder::new().await?;
822        let test_component = builder
823            .add_child("test_component", TEST_COMPONENT_URL, ChildOptions::new().eager())
824            .await?;
825        builder
826            .add_route(
827                Route::new()
828                    .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
829                    .from(Ref::parent())
830                    .to(&test_component),
831            )
832            .await?;
833        builder.init_mutable_config_to_empty(&test_component).await.unwrap();
834        builder
835            .set_config_value(&test_component, "publish_n_trees", opts.publish_n_trees.into())
836            .await
837            .unwrap();
838        let instance = builder.build().await?;
839        Ok(instance)
840    }
841
842    // All selectors in this test select against all tree names, in order to ensure the expected
843    // number of trees are published
844    #[fuchsia::test]
845    async fn inspect_data_for_component() -> Result<(), anyhow::Error> {
846        let instance = start_component(ComponentOptions { publish_n_trees: 1 }).await?;
847        let moniker = format!("realm_builder:{}/test_component", instance.root.child_name());
848        let component_selector = selectors::sanitize_moniker_for_selectors(&moniker);
849        let results = ArchiveReader::inspect()
850            .add_selector(format!("{component_selector}:[...]root"))
851            .snapshot()
852            .await?;
853        assert_eq!(results.len(), 1);
854        assert_data_tree!(results[0].payload.as_ref().unwrap(), root: {
855            "tree-0": 0u64,
856            int: 3u64,
857            "lazy-node": {
858                a: "test",
859                child: {
860                    double: 3.25,
861                },
862            }
863        });
864        // add_selector can take either a String or a Selector.
865        let lazy_property_selector = Selector {
866            component_selector: Some(fdiagnostics::ComponentSelector {
867                moniker_segments: Some(vec![
868                    fdiagnostics::StringSelector::ExactMatch(format!(
869                        "realm_builder:{}",
870                        instance.root.child_name()
871                    )),
872                    fdiagnostics::StringSelector::ExactMatch("test_component".into()),
873                ]),
874                ..Default::default()
875            }),
876            tree_selector: Some(fdiagnostics::TreeSelector::PropertySelector(
877                fdiagnostics::PropertySelector {
878                    node_path: vec![
879                        fdiagnostics::StringSelector::ExactMatch("root".into()),
880                        fdiagnostics::StringSelector::ExactMatch("lazy-node".into()),
881                    ],
882                    target_properties: fdiagnostics::StringSelector::ExactMatch("a".into()),
883                },
884            )),
885            tree_names: Some(fdiagnostics::TreeNames::All(fdiagnostics::All {})),
886            ..Default::default()
887        };
888        let int_property_selector = format!("{component_selector}:[...]root:int");
889        let mut reader = ArchiveReader::inspect();
890        reader.add_selector(int_property_selector).add_selector(lazy_property_selector);
891        let response = reader.snapshot().await?;
892        assert_eq!(response.len(), 1);
893        assert_eq!(response[0].moniker.to_string(), moniker);
894        assert_data_tree!(response[0].payload.as_ref().unwrap(), root: {
895            int: 3u64,
896            "lazy-node": {
897                a: "test"
898            }
899        });
900        Ok(())
901    }
902
903    #[fuchsia::test]
904    async fn select_all_for_moniker() {
905        let instance = start_component(ComponentOptions { publish_n_trees: 1 })
906            .await
907            .expect("component started");
908        let moniker = format!("realm_builder:{}/test_component", instance.root.child_name());
909        let results = ArchiveReader::inspect()
910            .select_all_for_component(moniker)
911            .snapshot()
912            .await
913            .expect("snapshotted");
914        assert_eq!(results.len(), 1);
915        assert_data_tree!(results[0].payload.as_ref().unwrap(), root: {
916            "tree-0": 0u64,
917            int: 3u64,
918            "lazy-node": {
919                a: "test",
920                child: {
921                    double: 3.25,
922                },
923            }
924        });
925    }
926
927    #[fuchsia::test]
928    async fn timeout() -> Result<(), anyhow::Error> {
929        let instance = start_component(ComponentOptions { publish_n_trees: 1 }).await?;
930
931        let mut reader = ArchiveReader::inspect();
932        reader
933            .add_selector(format!(
934                "realm_builder\\:{}/test_component:root",
935                instance.root.child_name()
936            ))
937            .with_timeout(zx::MonotonicDuration::from_nanos(0));
938        let result = reader.snapshot().await;
939        assert!(result.unwrap().is_empty());
940        Ok(())
941    }
942
943    #[fuchsia::test]
944    async fn component_selector() {
945        let selector = ComponentSelector::new(vec!["a".to_string()]);
946        assert_eq!(selector.moniker_str(), "a");
947        let arguments: Vec<_> = selector.to_selector_arguments().collect();
948        assert_eq!(arguments, vec![SelectorArgument::RawSelector("a:root".to_string())]);
949
950        let selector =
951            ComponentSelector::new(vec!["b".to_string(), "c".to_string(), "a".to_string()]);
952        assert_eq!(selector.moniker_str(), "b/c/a");
953
954        let selector = selector.with_tree_selector("root/b/c:d").with_tree_selector("root/e:f");
955        let arguments: Vec<_> = selector.to_selector_arguments().collect();
956        assert_eq!(
957            arguments,
958            vec![
959                SelectorArgument::RawSelector("b/c/a:root/b/c:d".into()),
960                SelectorArgument::RawSelector("b/c/a:root/e:f".into()),
961            ]
962        );
963    }
964
965    #[fuchsia::test]
966    async fn custom_archive() {
967        let proxy = spawn_fake_archive(serde_json::json!({
968            "moniker": "moniker",
969            "version": 1,
970            "data_source": "Inspect",
971            "metadata": {
972              "component_url": "component-url",
973              "timestamp": 0,
974              "filename": "filename",
975            },
976            "payload": {
977                "root": {
978                    "x": 1,
979                }
980            }
981        }));
982        let result =
983            ArchiveReader::inspect().with_archive(proxy).snapshot().await.expect("got result");
984        assert_eq!(result.len(), 1);
985        assert_data_tree!(result[0].payload.as_ref().unwrap(), root: { x: 1u64 });
986    }
987
988    #[fuchsia::test]
989    async fn handles_lists_correctly_on_snapshot_raw() {
990        let value = serde_json::json!({
991            "moniker": "moniker",
992            "version": 1,
993            "data_source": "Inspect",
994            "metadata": {
995            "component_url": "component-url",
996            "timestamp": 0,
997            "filename": "filename",
998            },
999            "payload": {
1000                "root": {
1001                    "x": 1,
1002                }
1003            }
1004        });
1005        let proxy = spawn_fake_archive(serde_json::json!([value.clone()]));
1006        let mut reader = ArchiveReader::inspect();
1007        reader.with_archive(proxy);
1008        let json_result = reader.snapshot_raw::<serde_json::Value>().await.expect("got result");
1009        match json_result {
1010            serde_json::Value::Array(values) => {
1011                assert_eq!(values.len(), 1);
1012                assert_eq!(values[0], value);
1013            }
1014            result => panic!("unexpected result: {result:?}"),
1015        }
1016        let cbor_result = reader.snapshot_raw::<ciborium::Value>().await.expect("got result");
1017        match cbor_result {
1018            ciborium::Value::Array(values) => {
1019                assert_eq!(values.len(), 1);
1020                let json_result =
1021                    values[0].deserialized::<serde_json::Value>().expect("convert to json");
1022                assert_eq!(json_result, value);
1023            }
1024            result => panic!("unexpected result: {result:?}"),
1025        }
1026    }
1027
1028    #[fuchsia::test(logging = false)]
1029    async fn snapshot_then_subscribe() {
1030        let (_instance, publisher, reader) = init_isolated_logging().await;
1031        let (mut stream, _errors) =
1032            reader.snapshot_then_subscribe().expect("subscribed to logs").split_streams();
1033        publisher.register_logger(None).unwrap();
1034        info!("hello from test");
1035        error!("error from test");
1036        let log = stream.next().await.unwrap();
1037        assert_eq!(log.msg().unwrap(), "hello from test");
1038        let log = stream.next().await.unwrap();
1039        assert_eq!(log.msg().unwrap(), "error from test");
1040    }
1041
1042    #[fuchsia::test]
1043    async fn read_many_trees_with_filtering() {
1044        let instance = start_component(ComponentOptions { publish_n_trees: 2 })
1045            .await
1046            .expect("component started");
1047        let selector = format!(
1048            "realm_builder\\:{}/test_component:[name=tree-0]root",
1049            instance.root.child_name()
1050        );
1051        let results = ArchiveReader::inspect()
1052            .add_selector(selector)
1053            // Only one schema since empty schemas are filtered out
1054            .with_minimum_schema_count(1)
1055            .snapshot()
1056            .await
1057            .expect("snapshotted");
1058        assert_matches!(results.iter().find(|v| v.metadata.name.as_ref() == "tree-1"), None);
1059        let should_have_data =
1060            results.into_iter().find(|v| v.metadata.name.as_ref() == "tree-0").unwrap();
1061        assert_data_tree!(should_have_data.payload.unwrap(), root: contains {
1062            "tree-0": 0u64,
1063        });
1064    }
1065
1066    fn spawn_fake_archive(data_to_send: serde_json::Value) -> fdiagnostics::ArchiveAccessorProxy {
1067        let (proxy, mut stream) =
1068            fidl::endpoints::create_proxy_and_stream::<fdiagnostics::ArchiveAccessorMarker>();
1069        fasync::Task::spawn(async move {
1070            while let Some(request) = stream.try_next().await.expect("stream request") {
1071                match request {
1072                    fdiagnostics::ArchiveAccessorRequest::StreamDiagnostics {
1073                        result_stream,
1074                        ..
1075                    } => {
1076                        let data = data_to_send.clone();
1077                        fasync::Task::spawn(handle_batch_iterator(data, result_stream)).detach();
1078                    }
1079                    fdiagnostics::ArchiveAccessorRequest::WaitForReady { responder, .. } => {
1080                        let _ = responder.send();
1081                    }
1082                    fdiagnostics::ArchiveAccessorRequest::_UnknownMethod { .. } => {
1083                        unreachable!("Unexpected method call");
1084                    }
1085                }
1086            }
1087        })
1088        .detach();
1089        proxy
1090    }
1091
1092    async fn handle_batch_iterator(
1093        data: serde_json::Value,
1094        result_stream: ServerEnd<fdiagnostics::BatchIteratorMarker>,
1095    ) {
1096        let mut called = false;
1097        let mut stream = result_stream.into_stream();
1098        while let Some(req) = stream.try_next().await.expect("stream request") {
1099            match req {
1100                fdiagnostics::BatchIteratorRequest::WaitForReady { responder } => {
1101                    let _ = responder.send();
1102                }
1103                fdiagnostics::BatchIteratorRequest::GetNext { responder } => {
1104                    if called {
1105                        responder.send(Ok(Vec::new())).expect("send response");
1106                        continue;
1107                    }
1108                    called = true;
1109                    let content = serde_json::to_string_pretty(&data).expect("json pretty");
1110                    let vmo_size = content.len() as u64;
1111                    let vmo = zx::Vmo::create(vmo_size).expect("create vmo");
1112                    vmo.write(content.as_bytes(), 0).expect("write vmo");
1113                    let buffer = fidl_fuchsia_mem::Buffer { vmo, size: vmo_size };
1114                    responder
1115                        .send(Ok(vec![fdiagnostics::FormattedContent::Json(buffer)]))
1116                        .expect("send response");
1117                }
1118                fdiagnostics::BatchIteratorRequest::_UnknownMethod { .. } => {
1119                    unreachable!("Unexpected method call");
1120                }
1121            }
1122        }
1123    }
1124
1125    async fn create_realm() -> RealmBuilder {
1126        let builder = RealmBuilder::new().await.expect("create realm builder");
1127        let archivist = builder
1128            .add_child("archivist", "#meta/archivist-for-embedding.cm", ChildOptions::new().eager())
1129            .await
1130            .expect("add child archivist");
1131        builder
1132            .add_route(
1133                Route::new()
1134                    .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
1135                    .capability(
1136                        Capability::protocol_by_name("fuchsia.tracing.provider.Registry")
1137                            .optional(),
1138                    )
1139                    .capability(Capability::event_stream("stopped"))
1140                    .capability(Capability::event_stream("capability_requested"))
1141                    .from(Ref::parent())
1142                    .to(&archivist),
1143            )
1144            .await
1145            .expect("added routes from parent to archivist");
1146        builder
1147            .add_route(
1148                Route::new()
1149                    .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
1150                    .from(&archivist)
1151                    .to(Ref::parent()),
1152            )
1153            .await
1154            .expect("routed LogSink from archivist to parent");
1155        builder
1156            .add_route(
1157                Route::new()
1158                    .capability(Capability::protocol_by_name("fuchsia.diagnostics.ArchiveAccessor"))
1159                    .from(Ref::dictionary(&archivist, "diagnostics-accessors"))
1160                    .to(Ref::parent()),
1161            )
1162            .await
1163            .expect("routed ArchiveAccessor from archivist to parent");
1164        builder
1165    }
1166
1167    async fn init_isolated_logging() -> (RealmInstance, Publisher, ArchiveReader<Logs>) {
1168        let instance = create_realm().await.build().await.unwrap();
1169        let log_sink_client = instance.root.connect_to_protocol_at_exposed_dir().unwrap();
1170        let accessor_proxy = instance.root.connect_to_protocol_at_exposed_dir().unwrap();
1171        let mut reader = ArchiveReader::logs();
1172        reader.with_archive(accessor_proxy);
1173        let options = PublisherOptions::default().use_log_sink(log_sink_client);
1174        let publisher = Publisher::new_async(options).await.unwrap();
1175        (instance, publisher, reader)
1176    }
1177
1178    #[fuchsia::test]
1179    fn retry_config_behavior() {
1180        let config = RetryConfig::MinSchemaCount(1);
1181        let got = 0;
1182
1183        assert!(config.should_retry(got));
1184
1185        let config = RetryConfig::MinSchemaCount(1);
1186        let got = 1;
1187
1188        assert!(!config.should_retry(got));
1189
1190        let config = RetryConfig::MinSchemaCount(1);
1191        let got = 2;
1192
1193        assert!(!config.should_retry(got));
1194
1195        let config = RetryConfig::MinSchemaCount(0);
1196        let got = 1;
1197
1198        assert!(!config.should_retry(got));
1199
1200        let config = RetryConfig::always();
1201        let got = 0;
1202
1203        assert!(config.should_retry(got));
1204
1205        let config = RetryConfig::never();
1206        let got = 0;
1207
1208        assert!(!config.should_retry(got));
1209    }
1210}