archivist_lib/
accessor.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::constants::FORMATTED_CONTENT_CHUNK_SIZE_TARGET;
6use crate::diagnostics::{BatchIteratorConnectionStats, TRACE_CATEGORY};
7use crate::error::AccessorError;
8use crate::formatter::{
9    FormattedStream, FxtPacketSerializer, JsonPacketSerializer, SerializedVmo, new_batcher,
10};
11use crate::inspect::ReaderServer;
12use crate::inspect::repository::InspectRepository;
13use crate::logs::repository::LogsRepository;
14use crate::logs::shared_buffer::FxtMessage;
15use crate::pipeline::Pipeline;
16use diagnostics_data::{Data, DiagnosticsData, ExtendedMoniker, Metadata};
17use fidl::endpoints::{ControlHandle, RequestStream};
18use fidl_fuchsia_diagnostics::{
19    ArchiveAccessorRequest, ArchiveAccessorRequestStream, BatchIteratorControlHandle,
20    BatchIteratorRequest, BatchIteratorRequestStream, ClientSelectorConfiguration,
21    ComponentSelector, DataType, Format, FormattedContent, PerformanceConfiguration, Selector,
22    SelectorArgument, StreamMode, StreamParameters, StringSelector, TreeSelector,
23    TreeSelectorUnknown,
24};
25use fidl_fuchsia_mem::Buffer;
26use fuchsia_inspect::NumericProperty;
27use fuchsia_sync::Mutex;
28use futures::StreamExt;
29use futures::future::{Either, select};
30use futures::prelude::*;
31use futures::stream::Peekable;
32use log::warn;
33use selectors::FastError;
34use serde::Serialize;
35use std::collections::HashMap;
36use std::pin::{Pin, pin};
37use std::sync::Arc;
38use thiserror::Error;
39use {fidl_fuchsia_diagnostics_host as fhost, fuchsia_async as fasync, fuchsia_trace as ftrace};
40
41#[derive(Debug, Copy, Clone)]
42pub struct BatchRetrievalTimeout(i64);
43
44impl BatchRetrievalTimeout {
45    pub fn from_seconds(s: i64) -> Self {
46        Self(s)
47    }
48
49    #[cfg(test)]
50    pub fn max() -> Self {
51        Self::from_seconds(-1)
52    }
53
54    pub fn seconds(&self) -> i64 {
55        if self.0 > 0 { self.0 } else { i64::MAX }
56    }
57}
58
59/// ArchiveAccessorServer represents an incoming connection from a client to an Archivist
60/// instance, through which the client may make Reader requests to the various data
61/// sources the Archivist offers.
62pub struct ArchiveAccessorServer {
63    inspect_repository: Arc<InspectRepository>,
64    logs_repository: Arc<LogsRepository>,
65    maximum_concurrent_snapshots_per_reader: u64,
66    scope: fasync::Scope,
67    default_batch_timeout_seconds: BatchRetrievalTimeout,
68}
69
70fn validate_and_parse_selectors(
71    selector_args: Vec<SelectorArgument>,
72) -> Result<Vec<Selector>, AccessorError> {
73    let mut selectors = vec![];
74    let mut errors = vec![];
75
76    if selector_args.is_empty() {
77        return Err(AccessorError::EmptySelectors);
78    }
79
80    for selector_arg in selector_args {
81        match selectors::take_from_argument::<FastError>(selector_arg) {
82            Ok(s) => selectors.push(s),
83            Err(e) => errors.push(e),
84        }
85    }
86
87    if !errors.is_empty() {
88        warn!(errors:?; "Found errors in selector arguments");
89    }
90
91    Ok(selectors)
92}
93
94fn validate_and_parse_log_selectors(
95    selector_args: Vec<SelectorArgument>,
96) -> Result<Vec<ComponentSelector>, AccessorError> {
97    // Only accept selectors of the type: `component:root` for logs for now.
98    let selectors = validate_and_parse_selectors(selector_args)?;
99    let mut component_selectors = Vec::with_capacity(selectors.len());
100    for selector in selectors {
101        // Unwrap safe: Previous validation discards any selector without a node.
102        let tree_selector = selector.tree_selector.as_ref().unwrap();
103        match tree_selector {
104            TreeSelector::PropertySelector(_) => {
105                return Err(AccessorError::InvalidLogSelector);
106            }
107            TreeSelector::SubtreeSelector(subtree_selector) => {
108                if subtree_selector.node_path.len() != 1 {
109                    return Err(AccessorError::InvalidLogSelector);
110                }
111                match &subtree_selector.node_path[0] {
112                    StringSelector::ExactMatch(val) if val == "root" => {}
113                    StringSelector::StringPattern(val) if val == "root" => {}
114                    _ => {
115                        return Err(AccessorError::InvalidLogSelector);
116                    }
117                }
118            }
119            TreeSelectorUnknown!() => {}
120        }
121        component_selectors.push(selector.component_selector.unwrap());
122    }
123    Ok(component_selectors)
124}
125
126impl ArchiveAccessorServer {
127    /// Create a new accessor for interacting with the archivist's data. The pipeline
128    /// parameter determines which static configurations scope/restrict the visibility of
129    /// data accessed by readers spawned by this accessor.
130    pub fn new(
131        inspect_repository: Arc<InspectRepository>,
132        logs_repository: Arc<LogsRepository>,
133        maximum_concurrent_snapshots_per_reader: u64,
134        default_batch_timeout_seconds: BatchRetrievalTimeout,
135        scope: fasync::Scope,
136    ) -> Self {
137        ArchiveAccessorServer {
138            inspect_repository,
139            logs_repository,
140            maximum_concurrent_snapshots_per_reader,
141            scope,
142            default_batch_timeout_seconds,
143        }
144    }
145
146    async fn spawn<R: ArchiveAccessorWriter + Send>(
147        pipeline: Arc<Pipeline>,
148        inspect_repo: Arc<InspectRepository>,
149        log_repo: Arc<LogsRepository>,
150        requests: R,
151        params: StreamParameters,
152        maximum_concurrent_snapshots_per_reader: u64,
153        default_batch_timeout_seconds: BatchRetrievalTimeout,
154    ) -> Result<(), AccessorError> {
155        let format = params.format.ok_or(AccessorError::MissingFormat)?;
156        if !matches!(format, Format::Json | Format::Cbor | Format::Fxt) {
157            return Err(AccessorError::UnsupportedFormat);
158        }
159        let mode = params.stream_mode.ok_or(AccessorError::MissingMode)?;
160
161        let performance_config: PerformanceConfig = PerformanceConfig::new(
162            &params,
163            maximum_concurrent_snapshots_per_reader,
164            default_batch_timeout_seconds,
165        )?;
166
167        let trace_id = ftrace::Id::random();
168        match params.data_type.ok_or(AccessorError::MissingDataType)? {
169            DataType::Inspect => {
170                let _trace_guard = ftrace::async_enter!(
171                    trace_id,
172                    TRACE_CATEGORY,
173                    c"ArchiveAccessorServer::spawn",
174                    "data_type" => "Inspect",
175                    "trace_id" => u64::from(trace_id)
176                );
177                if !matches!(mode, StreamMode::Snapshot) {
178                    return Err(AccessorError::UnsupportedMode);
179                }
180                let stats = Arc::new(pipeline.accessor_stats().new_inspect_batch_iterator());
181
182                let selectors =
183                    params.client_selector_configuration.ok_or(AccessorError::MissingSelectors)?;
184
185                let selectors = match selectors {
186                    ClientSelectorConfiguration::Selectors(selectors) => {
187                        Some(validate_and_parse_selectors(selectors)?)
188                    }
189                    ClientSelectorConfiguration::SelectAll(_) => None,
190                    _ => return Err(AccessorError::InvalidSelectors("unrecognized selectors")),
191                };
192
193                let static_hierarchy_allowlist = pipeline.static_hierarchy_allowlist();
194                let unpopulated_container_vec =
195                    inspect_repo.fetch_inspect_data(&selectors, static_hierarchy_allowlist);
196
197                let per_component_budget_opt = if unpopulated_container_vec.is_empty() {
198                    None
199                } else {
200                    performance_config
201                        .aggregated_content_limit_bytes
202                        .map(|limit| (limit as usize) / unpopulated_container_vec.len())
203                };
204
205                if let Some(max_snapshot_size) = performance_config.aggregated_content_limit_bytes {
206                    stats.global_stats().record_max_snapshot_size_config(max_snapshot_size);
207                }
208                BatchIterator::new(
209                    ReaderServer::stream(
210                        unpopulated_container_vec,
211                        performance_config,
212                        selectors,
213                        Arc::clone(&stats),
214                        trace_id,
215                    ),
216                    requests,
217                    mode,
218                    stats,
219                    per_component_budget_opt,
220                    trace_id,
221                    format,
222                )?
223                .run()
224                .await
225            }
226            DataType::Logs => {
227                if format == Format::Cbor {
228                    // CBOR is not supported for logs
229                    return Err(AccessorError::UnsupportedFormat);
230                }
231                let _trace_guard = ftrace::async_enter!(
232                    trace_id,
233                    TRACE_CATEGORY,
234                    c"ArchiveAccessorServer::spawn",
235                    "data_type" => "Logs",
236                    // An async duration cannot have multiple concurrent child async durations
237                    // so we include the nonce as metadata to manually determine relationship.
238                    "trace_id" => u64::from(trace_id)
239                );
240                let stats = Arc::new(pipeline.accessor_stats().new_logs_batch_iterator());
241                let selectors = match params.client_selector_configuration {
242                    Some(ClientSelectorConfiguration::Selectors(selectors)) => {
243                        validate_and_parse_log_selectors(selectors)?
244                    }
245                    Some(ClientSelectorConfiguration::SelectAll(_)) => Vec::new(),
246                    _ => return Err(AccessorError::InvalidSelectors("unrecognized selectors")),
247                };
248                match format {
249                    Format::Fxt => {
250                        let logs = Box::pin(log_repo.logs_cursor_raw(mode, selectors));
251                        BatchIterator::new_serving_fxt(
252                            logs,
253                            requests,
254                            mode,
255                            stats,
256                            trace_id,
257                            performance_config,
258                        )?
259                        .run()
260                        .await?;
261                        Ok(())
262                    }
263                    Format::Json => {
264                        let logs = Box::pin(log_repo.logs_cursor(mode, selectors));
265                        BatchIterator::new_serving_arrays(logs, requests, mode, stats, trace_id)?
266                            .run()
267                            .await?;
268                        Ok(())
269                    }
270                    // TODO(https://fxbug.dev/401548725): Remove this from the FIDL definition.
271                    Format::Text => Err(AccessorError::UnsupportedFormat),
272                    Format::Cbor => unreachable!("CBOR is not supported for logs"),
273                }
274            }
275        }
276    }
277
278    /// Spawn an instance `fidl_fuchsia_diagnostics/Archive` that allows clients to open
279    /// reader session to diagnostics data.
280    pub fn spawn_server<RequestStream>(&self, pipeline: Arc<Pipeline>, mut stream: RequestStream)
281    where
282        RequestStream: ArchiveAccessorTranslator + Send + 'static,
283        <RequestStream as ArchiveAccessorTranslator>::InnerDataRequestChannel:
284            ArchiveAccessorWriter + Send,
285    {
286        // Self isn't guaranteed to live into the exception handling of the async block. We need to clone self
287        // to have a version that can be referenced in the exception handling.
288        let log_repo = Arc::clone(&self.logs_repository);
289        let inspect_repo = Arc::clone(&self.inspect_repository);
290        let maximum_concurrent_snapshots_per_reader = self.maximum_concurrent_snapshots_per_reader;
291        let default_batch_timeout_seconds = self.default_batch_timeout_seconds;
292        let scope = self.scope.to_handle();
293        self.scope.spawn(async move {
294            let stats = pipeline.accessor_stats();
295            stats.global_stats.connections_opened.add(1);
296            while let Some(request) = stream.next().await {
297                let control_handle = request.iterator.get_control_handle();
298                stats.global_stats.stream_diagnostics_requests.add(1);
299                let pipeline = Arc::clone(&pipeline);
300
301                // Store the batch iterator task so that we can ensure that the client finishes
302                // draining items through it when a Controller#Stop call happens. For example,
303                // this allows tests to fetch all isolated logs before finishing.
304                let inspect_repo_for_task = Arc::clone(&inspect_repo);
305                let log_repo_for_task = Arc::clone(&log_repo);
306                scope.spawn(async move {
307                    if let Err(e) = Self::spawn(
308                        pipeline,
309                        inspect_repo_for_task,
310                        log_repo_for_task,
311                        request.iterator,
312                        request.parameters,
313                        maximum_concurrent_snapshots_per_reader,
314                        default_batch_timeout_seconds,
315                    )
316                    .await
317                        && let Some(control) = control_handle
318                    {
319                        e.close(control);
320                    }
321                });
322            }
323            pipeline.accessor_stats().global_stats.connections_closed.add(1);
324        });
325    }
326}
327
328pub trait ArchiveAccessorWriter {
329    /// Writes diagnostics data to the remote side.
330    fn write(
331        &mut self,
332        results: Vec<FormattedContent>,
333    ) -> impl Future<Output = Result<(), IteratorError>> + Send;
334
335    /// Waits for a buffer to be available for writing into. For sockets, this is a no-op.
336    fn wait_for_buffer(&mut self) -> impl Future<Output = anyhow::Result<()>> + Send {
337        futures::future::ready(Ok(()))
338    }
339
340    /// Takes the control handle from the FIDL stream (or returns None
341    /// if the handle has already been taken, or if this is a socket.
342    fn get_control_handle(&self) -> Option<BatchIteratorControlHandle> {
343        None
344    }
345
346    /// Sends an on ready event.
347    fn maybe_respond_ready(&mut self) -> impl Future<Output = Result<(), AccessorError>> + Send {
348        futures::future::ready(Ok(()))
349    }
350
351    /// Waits for ZX_ERR_PEER_CLOSED
352    fn wait_for_close(&mut self) -> impl Future<Output = ()> + Send;
353}
354
355fn get_buffer_from_formatted_content(
356    content: fidl_fuchsia_diagnostics::FormattedContent,
357) -> Result<Buffer, AccessorError> {
358    match content {
359        FormattedContent::Json(json) => Ok(json),
360        _ => Err(AccessorError::UnsupportedFormat),
361    }
362}
363
364impl ArchiveAccessorWriter for fuchsia_async::Socket {
365    async fn write(&mut self, data: Vec<FormattedContent>) -> Result<(), IteratorError> {
366        if data.is_empty() {
367            return Err(IteratorError::PeerClosed);
368        }
369        let mut buf = vec![0];
370        for value in data {
371            let data = get_buffer_from_formatted_content(value)?;
372            buf.resize(data.size as usize, 0);
373            data.vmo.read(&mut buf, 0)?;
374            let res = self.write_all(&buf).await;
375            if res.is_err() {
376                // connection probably closed.
377                break;
378            }
379        }
380        Ok(())
381    }
382
383    async fn wait_for_close(&mut self) {
384        let _ = self.on_closed().await;
385    }
386}
387
388#[derive(Error, Debug)]
389pub enum IteratorError {
390    #[error("Peer closed")]
391    PeerClosed,
392    #[error(transparent)]
393    Ipc(#[from] fidl::Error),
394    #[error(transparent)]
395    AccessorError(#[from] AccessorError),
396    // This error should be unreachable. We should never
397    // fail to read from a VMO that we created, but the type system
398    // requires us to handle this.
399    #[error("Error reading from VMO: {}", source)]
400    VmoReadError {
401        #[from]
402        source: zx::Status,
403    },
404}
405
406impl ArchiveAccessorWriter for Peekable<BatchIteratorRequestStream> {
407    async fn write(&mut self, data: Vec<FormattedContent>) -> Result<(), IteratorError> {
408        loop {
409            match self.next().await {
410                Some(Ok(BatchIteratorRequest::GetNext { responder })) => {
411                    responder.send(Ok(data))?;
412                    return Ok(());
413                }
414                Some(Ok(BatchIteratorRequest::WaitForReady { responder })) => {
415                    responder.send()?;
416                }
417                Some(Ok(BatchIteratorRequest::_UnknownMethod { method_type, ordinal, .. })) => {
418                    warn!(method_type:?, ordinal; "Got unknown interaction on BatchIterator");
419                    return Err(IteratorError::PeerClosed);
420                }
421                Some(Err(err)) => return Err(err.into()),
422                None => {
423                    return Err(IteratorError::PeerClosed);
424                }
425            }
426        }
427    }
428
429    async fn maybe_respond_ready(&mut self) -> Result<(), AccessorError> {
430        let mut this = Pin::new(self);
431        if matches!(this.as_mut().peek().await, Some(Ok(BatchIteratorRequest::WaitForReady { .. })))
432        {
433            let Some(Ok(BatchIteratorRequest::WaitForReady { responder })) = this.next().await
434            else {
435                unreachable!("We already checked the next request was WaitForReady");
436            };
437            responder.send()?;
438        }
439        Ok(())
440    }
441
442    async fn wait_for_buffer(&mut self) -> anyhow::Result<()> {
443        let this = Pin::new(self);
444        match this.peek().await {
445            Some(Ok(_)) => Ok(()),
446            _ => Err(IteratorError::PeerClosed.into()),
447        }
448    }
449
450    fn get_control_handle(&self) -> Option<BatchIteratorControlHandle> {
451        Some(self.get_ref().control_handle())
452    }
453
454    async fn wait_for_close(&mut self) {
455        let _ = self.get_ref().control_handle().on_closed().await;
456    }
457}
458
459pub struct ArchiveIteratorRequest<R> {
460    parameters: StreamParameters,
461    iterator: R,
462}
463
464/// Translation trait used to support both remote and
465/// local ArchiveAccessor implementations.
466pub trait ArchiveAccessorTranslator {
467    type InnerDataRequestChannel;
468    fn next(
469        &mut self,
470    ) -> impl Future<Output = Option<ArchiveIteratorRequest<Self::InnerDataRequestChannel>>> + Send;
471}
472
473impl ArchiveAccessorTranslator for fhost::ArchiveAccessorRequestStream {
474    type InnerDataRequestChannel = fuchsia_async::Socket;
475
476    async fn next(&mut self) -> Option<ArchiveIteratorRequest<Self::InnerDataRequestChannel>> {
477        match StreamExt::next(self).await {
478            Some(Ok(fhost::ArchiveAccessorRequest::StreamDiagnostics {
479                parameters,
480                responder,
481                stream,
482            })) => {
483                // It's fine for the client to send us a socket
484                // and discard the channel without waiting for a response.
485                // Future communication takes place over the socket so
486                // the client may opt to use this as an optimization.
487                let _ = responder.send();
488                Some(ArchiveIteratorRequest {
489                    iterator: fuchsia_async::Socket::from_socket(stream),
490                    parameters,
491                })
492            }
493            _ => None,
494        }
495    }
496}
497
498impl ArchiveAccessorTranslator for ArchiveAccessorRequestStream {
499    type InnerDataRequestChannel = Peekable<BatchIteratorRequestStream>;
500
501    async fn next(&mut self) -> Option<ArchiveIteratorRequest<Self::InnerDataRequestChannel>> {
502        loop {
503            match StreamExt::next(self).await {
504                Some(Ok(ArchiveAccessorRequest::StreamDiagnostics {
505                    control_handle: _,
506                    result_stream,
507                    stream_parameters,
508                })) => {
509                    return Some(ArchiveIteratorRequest {
510                        iterator: result_stream.into_stream().peekable(),
511                        parameters: stream_parameters,
512                    });
513                }
514                Some(Ok(ArchiveAccessorRequest::WaitForReady { responder })) => {
515                    let _ = responder.send();
516                }
517                _ => return None,
518            }
519        }
520    }
521}
522struct SchemaTruncationCounter {
523    truncated_schemas: u64,
524    total_schemas: u64,
525}
526
527impl SchemaTruncationCounter {
528    pub fn new() -> Arc<Mutex<Self>> {
529        Arc::new(Mutex::new(Self { truncated_schemas: 0, total_schemas: 0 }))
530    }
531}
532
533pub(crate) struct BatchIterator<R> {
534    requests: R,
535    stats: Arc<BatchIteratorConnectionStats>,
536    data: FormattedStream,
537    truncation_counter: Option<Arc<Mutex<SchemaTruncationCounter>>>,
538    parent_trace_id: ftrace::Id,
539}
540
541// Checks if a given schema is within a components budget, and if it is, updates the budget,
542// then returns true. Otherwise, if the schema is not within budget, returns false.
543fn maybe_update_budget(
544    budget_map: &mut HashMap<ExtendedMoniker, usize>,
545    moniker: &ExtendedMoniker,
546    bytes: usize,
547    byte_limit: usize,
548) -> bool {
549    if let Some(remaining_budget) = budget_map.get_mut(moniker) {
550        if *remaining_budget + bytes > byte_limit {
551            false
552        } else {
553            *remaining_budget += bytes;
554            true
555        }
556    } else if bytes > byte_limit {
557        budget_map.insert(moniker.clone(), 0);
558        false
559    } else {
560        budget_map.insert(moniker.clone(), bytes);
561        true
562    }
563}
564
565impl<R> BatchIterator<R>
566where
567    R: ArchiveAccessorWriter + Send,
568{
569    pub fn new<Items, D>(
570        data: Items,
571        requests: R,
572        mode: StreamMode,
573        stats: Arc<BatchIteratorConnectionStats>,
574        per_component_byte_limit_opt: Option<usize>,
575        parent_trace_id: ftrace::Id,
576        format: Format,
577    ) -> Result<Self, AccessorError>
578    where
579        Items: Stream<Item = Data<D>> + Send + 'static,
580        D: DiagnosticsData + 'static,
581    {
582        let result_stats_for_fut = Arc::clone(&stats);
583
584        let budget_tracker_shared = Arc::new(Mutex::new(HashMap::new()));
585
586        let truncation_counter = SchemaTruncationCounter::new();
587        let stream_owned_counter_for_fut = Arc::clone(&truncation_counter);
588
589        let data = data.then(move |mut d| {
590            let stream_owned_counter = Arc::clone(&stream_owned_counter_for_fut);
591            let result_stats = Arc::clone(&result_stats_for_fut);
592            let budget_tracker = Arc::clone(&budget_tracker_shared);
593            async move {
594                let trace_id = ftrace::Id::random();
595                let _trace_guard = ftrace::async_enter!(
596                    trace_id,
597                    TRACE_CATEGORY,
598                    c"BatchIterator::new.serialize",
599                    // An async duration cannot have multiple concurrent child async durations
600                    // so we include the nonce as metadata to manually determine relationship.
601                    "parent_trace_id" => u64::from(parent_trace_id),
602                    "trace_id" => u64::from(trace_id),
603                    "moniker" => d.moniker.to_string().as_ref()
604                );
605                let mut unlocked_counter = stream_owned_counter.lock();
606                let mut tracker_guard = budget_tracker.lock();
607                unlocked_counter.total_schemas += 1;
608                if d.metadata.has_errors() {
609                    result_stats.add_result_error();
610                }
611
612                match SerializedVmo::serialize(&d, D::DATA_TYPE, format) {
613                    Err(e) => {
614                        result_stats.add_result_error();
615                        Err(e)
616                    }
617                    Ok(contents) => {
618                        result_stats.add_result();
619                        match per_component_byte_limit_opt {
620                            Some(x) => {
621                                if maybe_update_budget(
622                                    &mut tracker_guard,
623                                    &d.moniker,
624                                    contents.size as usize,
625                                    x,
626                                ) {
627                                    Ok(contents)
628                                } else {
629                                    result_stats.add_schema_truncated();
630                                    unlocked_counter.truncated_schemas += 1;
631                                    d.drop_payload();
632                                    // TODO(66085): If a payload is truncated, cache the
633                                    // new schema so that we can reuse if other schemas from
634                                    // the same component get dropped.
635                                    SerializedVmo::serialize(&d, D::DATA_TYPE, format)
636                                }
637                            }
638                            None => Ok(contents),
639                        }
640                    }
641                }
642            }
643        });
644
645        Self::new_inner(
646            new_batcher(data, Arc::clone(&stats), mode),
647            requests,
648            stats,
649            Some(truncation_counter),
650            parent_trace_id,
651        )
652    }
653
654    pub fn new_serving_fxt<S>(
655        data: S,
656        requests: R,
657        mode: StreamMode,
658        stats: Arc<BatchIteratorConnectionStats>,
659        parent_trace_id: ftrace::Id,
660        performance_config: PerformanceConfig,
661    ) -> Result<Self, AccessorError>
662    where
663        S: Stream<Item = FxtMessage> + Send + Unpin + 'static,
664    {
665        let data = FxtPacketSerializer::new(
666            Arc::clone(&stats),
667            performance_config
668                .aggregated_content_limit_bytes
669                .unwrap_or(FORMATTED_CONTENT_CHUNK_SIZE_TARGET),
670            data,
671        );
672        Self::new_inner(
673            new_batcher(data, Arc::clone(&stats), mode),
674            requests,
675            stats,
676            None,
677            parent_trace_id,
678        )
679    }
680
681    pub fn new_serving_arrays<D, S>(
682        data: S,
683        requests: R,
684        mode: StreamMode,
685        stats: Arc<BatchIteratorConnectionStats>,
686        parent_trace_id: ftrace::Id,
687    ) -> Result<Self, AccessorError>
688    where
689        D: Serialize + Send + 'static,
690        S: Stream<Item = D> + Send + Unpin + 'static,
691    {
692        let data = JsonPacketSerializer::new(
693            Arc::clone(&stats),
694            FORMATTED_CONTENT_CHUNK_SIZE_TARGET,
695            data,
696        );
697        Self::new_inner(
698            new_batcher(data, Arc::clone(&stats), mode),
699            requests,
700            stats,
701            None,
702            parent_trace_id,
703        )
704    }
705
706    fn new_inner(
707        data: FormattedStream,
708        requests: R,
709        stats: Arc<BatchIteratorConnectionStats>,
710        truncation_counter: Option<Arc<Mutex<SchemaTruncationCounter>>>,
711        parent_trace_id: ftrace::Id,
712    ) -> Result<Self, AccessorError> {
713        stats.open_connection();
714        Ok(Self { data, requests, stats, truncation_counter, parent_trace_id })
715    }
716
717    pub async fn run(mut self) -> Result<(), AccessorError> {
718        self.requests.maybe_respond_ready().await?;
719        while self.requests.wait_for_buffer().await.is_ok() {
720            self.stats.add_request();
721            let start_time = zx::MonotonicInstant::get();
722            let trace_id = ftrace::Id::random();
723            let _trace_guard = ftrace::async_enter!(
724                trace_id,
725                TRACE_CATEGORY,
726                c"BatchIterator::run.get_send_batch",
727                // An async duration cannot have multiple concurrent child async durations
728                // so we include the nonce as metadata to manually determine relationship.
729                "parent_trace_id" => u64::from(self.parent_trace_id),
730                "trace_id" => u64::from(trace_id)
731            );
732            let batch = {
733                let wait_for_close = pin!(self.requests.wait_for_close());
734                let next_data = self.data.next();
735                match select(next_data, wait_for_close).await {
736                    // if we get None back, treat that as a terminal batch with an empty vec
737                    Either::Left((batch_option, _)) => batch_option.unwrap_or_default(),
738                    // if the client closes the channel, stop waiting and terminate.
739                    Either::Right(_) => break,
740                }
741            };
742
743            // turn errors into epitaphs -- we drop intermediate items if there was an error midway
744            let batch = batch.into_iter().collect::<Result<Vec<_>, _>>()?;
745
746            // increment counters
747            self.stats.add_response();
748            if batch.is_empty() {
749                if let Some(truncation_count) = &self.truncation_counter {
750                    let unlocked_count = truncation_count.lock();
751                    if unlocked_count.total_schemas > 0 {
752                        self.stats.global_stats().record_percent_truncated_schemas(
753                            ((unlocked_count.truncated_schemas as f32
754                                / unlocked_count.total_schemas as f32)
755                                * 100.0)
756                                .round() as u64,
757                        );
758                    }
759                }
760                self.stats.add_terminal();
761            }
762            self.stats
763                .global_stats()
764                .record_batch_duration(zx::MonotonicInstant::get() - start_time);
765            if self.requests.write(batch).await.is_err() {
766                // Peer closed, end the stream.
767                break;
768            }
769        }
770        Ok(())
771    }
772}
773
774impl<R> Drop for BatchIterator<R> {
775    fn drop(&mut self) {
776        self.stats.close_connection();
777    }
778}
779
780pub struct PerformanceConfig {
781    pub batch_timeout_sec: i64,
782    pub aggregated_content_limit_bytes: Option<u64>,
783    pub maximum_concurrent_snapshots_per_reader: u64,
784}
785
786impl PerformanceConfig {
787    pub fn new(
788        params: &StreamParameters,
789        maximum_concurrent_snapshots_per_reader: u64,
790        default_batch_timeout_seconds: BatchRetrievalTimeout,
791    ) -> Result<PerformanceConfig, AccessorError> {
792        let batch_timeout = match params {
793            // If only nested batch retrieval timeout is definitely not set,
794            // use the optional outer field.
795            StreamParameters {
796                batch_retrieval_timeout_seconds,
797                performance_configuration: None,
798                ..
799            }
800            | StreamParameters {
801                batch_retrieval_timeout_seconds,
802                performance_configuration:
803                    Some(PerformanceConfiguration { batch_retrieval_timeout_seconds: None, .. }),
804                ..
805            } => batch_retrieval_timeout_seconds,
806            // If the outer field is definitely not set, and the inner field might be,
807            // use the inner field.
808            StreamParameters {
809                batch_retrieval_timeout_seconds: None,
810                performance_configuration:
811                    Some(PerformanceConfiguration { batch_retrieval_timeout_seconds, .. }),
812                ..
813            } => batch_retrieval_timeout_seconds,
814            // Both the inner and outer fields are set, which is an error.
815            _ => return Err(AccessorError::DuplicateBatchTimeout),
816        }
817        .map(BatchRetrievalTimeout::from_seconds)
818        .unwrap_or(default_batch_timeout_seconds);
819
820        let aggregated_content_limit_bytes = match params {
821            StreamParameters {
822                performance_configuration:
823                    Some(PerformanceConfiguration { max_aggregate_content_size_bytes, .. }),
824                ..
825            } => *max_aggregate_content_size_bytes,
826            _ => None,
827        };
828
829        Ok(PerformanceConfig {
830            batch_timeout_sec: batch_timeout.seconds(),
831            aggregated_content_limit_bytes,
832            maximum_concurrent_snapshots_per_reader,
833        })
834    }
835}
836
837#[cfg(test)]
838mod tests {
839    use super::*;
840    use crate::diagnostics::AccessorStats;
841    use crate::logs::shared_buffer::create_ring_buffer;
842    use assert_matches::assert_matches;
843    use fidl_fuchsia_diagnostics::{ArchiveAccessorMarker, BatchIteratorMarker};
844    use fuchsia_inspect::{Inspector, Node};
845
846    #[fuchsia::test]
847    async fn logs_only_accept_basic_component_selectors() {
848        let scope = fasync::Scope::new();
849        let (accessor, stream) =
850            fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>();
851        let pipeline = Arc::new(Pipeline::for_test(None));
852        let inspector = Inspector::default();
853        let log_repo = LogsRepository::new(
854            create_ring_buffer(1_000_000),
855            std::iter::empty(),
856            inspector.root(),
857            scope.new_child(),
858        );
859        let inspect_repo =
860            Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], scope.new_child()));
861        let server = ArchiveAccessorServer::new(
862            inspect_repo,
863            log_repo,
864            4,
865            BatchRetrievalTimeout::max(),
866            scope.new_child(),
867        );
868        server.spawn_server(pipeline, stream);
869
870        // A selector of the form `component:node/path:property` is rejected.
871        let (batch_iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
872        assert!(
873            accessor
874                .r#stream_diagnostics(
875                    &StreamParameters {
876                        data_type: Some(DataType::Logs),
877                        stream_mode: Some(StreamMode::SnapshotThenSubscribe),
878                        format: Some(Format::Json),
879                        client_selector_configuration: Some(
880                            ClientSelectorConfiguration::Selectors(vec![
881                                SelectorArgument::RawSelector("foo:root/bar:baz".to_string()),
882                            ])
883                        ),
884                        ..Default::default()
885                    },
886                    server_end
887                )
888                .is_ok()
889        );
890        assert_matches!(
891            batch_iterator.get_next().await,
892            Err(fidl::Error::ClientChannelClosed { status: zx_status::Status::INVALID_ARGS, .. })
893        );
894
895        // A selector of the form `component:root` is accepted.
896        let (batch_iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
897        assert!(
898            accessor
899                .r#stream_diagnostics(
900                    &StreamParameters {
901                        data_type: Some(DataType::Logs),
902                        stream_mode: Some(StreamMode::Snapshot),
903                        format: Some(Format::Json),
904                        client_selector_configuration: Some(
905                            ClientSelectorConfiguration::Selectors(vec![
906                                SelectorArgument::RawSelector("foo:root".to_string()),
907                            ])
908                        ),
909                        ..Default::default()
910                    },
911                    server_end
912                )
913                .is_ok()
914        );
915
916        assert!(batch_iterator.get_next().await.is_ok());
917    }
918
919    #[fuchsia::test]
920    async fn accessor_skips_invalid_selectors() {
921        let scope = fasync::Scope::new();
922        let (accessor, stream) =
923            fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>();
924        let pipeline = Arc::new(Pipeline::for_test(None));
925        let inspector = Inspector::default();
926        let log_repo = LogsRepository::new(
927            create_ring_buffer(1_000_000),
928            std::iter::empty(),
929            inspector.root(),
930            scope.new_child(),
931        );
932        let inspect_repo =
933            Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], scope.new_child()));
934        let server = Arc::new(ArchiveAccessorServer::new(
935            inspect_repo,
936            log_repo,
937            4,
938            BatchRetrievalTimeout::max(),
939            scope.new_child(),
940        ));
941        server.spawn_server(pipeline, stream);
942
943        // A selector of the form `component:node/path:property` is rejected.
944        let (batch_iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
945
946        assert!(
947            accessor
948                .r#stream_diagnostics(
949                    &StreamParameters {
950                        data_type: Some(DataType::Inspect),
951                        stream_mode: Some(StreamMode::Snapshot),
952                        format: Some(Format::Json),
953                        client_selector_configuration: Some(
954                            ClientSelectorConfiguration::Selectors(vec![
955                                SelectorArgument::RawSelector("invalid".to_string()),
956                                SelectorArgument::RawSelector("valid:root".to_string()),
957                            ])
958                        ),
959                        ..Default::default()
960                    },
961                    server_end
962                )
963                .is_ok()
964        );
965
966        // The batch iterator proxy should remain valid and providing responses regardless of the
967        // invalid selectors that were given.
968        assert!(batch_iterator.get_next().await.is_ok());
969    }
970
971    #[fuchsia::test]
972    async fn buffered_iterator_handles_two_consecutive_buffer_waits() {
973        let (client, server) = fidl::endpoints::create_proxy_and_stream::<BatchIteratorMarker>();
974        let _fut = client.get_next();
975        let mut server = server.peekable();
976        assert_matches!(server.wait_for_buffer().await, Ok(()));
977        assert_matches!(server.wait_for_buffer().await, Ok(()));
978    }
979
980    #[fuchsia::test]
981    async fn buffered_iterator_handles_peer_closed() {
982        let (client, server) = fidl::endpoints::create_proxy_and_stream::<BatchIteratorMarker>();
983        let mut server = server.peekable();
984        drop(client);
985        assert_matches!(
986            server
987                .write(vec![FormattedContent::Json(Buffer {
988                    size: 1,
989                    vmo: zx::Vmo::create(1).unwrap(),
990                })])
991                .await,
992            Err(IteratorError::PeerClosed)
993        );
994    }
995
996    #[fuchsia::test]
997    fn socket_writer_handles_json() {
998        let vmo = zx::Vmo::create(1).unwrap();
999        vmo.write(&[5u8], 0).unwrap();
1000        let koid = vmo.koid().unwrap();
1001        let text = FormattedContent::Json(Buffer { size: 1, vmo });
1002        let result = get_buffer_from_formatted_content(text).unwrap();
1003        assert_eq!(result.size, 1);
1004        assert_eq!(result.vmo.koid().unwrap(), koid);
1005        let mut buffer = [0];
1006        result.vmo.read(&mut buffer, 0).unwrap();
1007        assert_eq!(buffer[0], 5);
1008    }
1009
1010    #[fuchsia::test]
1011    fn socket_writer_does_not_handle_cbor() {
1012        let vmo = zx::Vmo::create(1).unwrap();
1013        vmo.write(&[5u8], 0).unwrap();
1014        let text = FormattedContent::Cbor(vmo);
1015        let result = get_buffer_from_formatted_content(text);
1016        assert_matches!(result, Err(AccessorError::UnsupportedFormat));
1017    }
1018
1019    #[fuchsia::test]
1020    async fn socket_writer_handles_closed_socket() {
1021        let (local, remote) = zx::Socket::create_stream();
1022        drop(local);
1023        let mut remote = fuchsia_async::Socket::from_socket(remote);
1024        {
1025            let result = ArchiveAccessorWriter::write(
1026                &mut remote,
1027                vec![FormattedContent::Json(Buffer { size: 1, vmo: zx::Vmo::create(1).unwrap() })],
1028            )
1029            .await;
1030            assert_matches!(result, Ok(()));
1031        }
1032        remote.wait_for_close().await;
1033    }
1034
1035    #[fuchsia::test]
1036    fn batch_iterator_terminates_on_client_disconnect() {
1037        let mut executor = fasync::TestExecutor::new();
1038        let (batch_iterator_proxy, stream) =
1039            fidl::endpoints::create_proxy_and_stream::<BatchIteratorMarker>();
1040        // Create a batch iterator that uses a hung stream to serve logs.
1041        let batch_iterator = BatchIterator::new(
1042            futures::stream::pending::<diagnostics_data::Data<diagnostics_data::Logs>>(),
1043            stream.peekable(),
1044            StreamMode::Subscribe,
1045            Arc::new(AccessorStats::new(Node::default()).new_inspect_batch_iterator()),
1046            None,
1047            ftrace::Id::random(),
1048            Format::Json,
1049        )
1050        .expect("create batch iterator");
1051
1052        let mut batch_iterator_fut = batch_iterator.run().boxed();
1053        assert!(executor.run_until_stalled(&mut batch_iterator_fut).is_pending());
1054
1055        // After sending a request, the request should be unfulfilled.
1056        let mut iterator_request_fut = batch_iterator_proxy.get_next();
1057        assert!(executor.run_until_stalled(&mut iterator_request_fut).is_pending());
1058        assert!(executor.run_until_stalled(&mut batch_iterator_fut).is_pending());
1059        assert!(executor.run_until_stalled(&mut iterator_request_fut).is_pending());
1060
1061        // After closing the client end of the channel, the server should terminate and release
1062        // resources.
1063        drop(iterator_request_fut);
1064        drop(batch_iterator_proxy);
1065        assert_matches!(executor.run_singlethreaded(&mut batch_iterator_fut), Ok(()));
1066    }
1067
1068    #[fuchsia::test]
1069    async fn batch_iterator_on_ready_is_called() {
1070        let scope = fasync::Scope::new();
1071        let (accessor, stream) =
1072            fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>();
1073        let pipeline = Arc::new(Pipeline::for_test(None));
1074        let inspector = Inspector::default();
1075        let log_repo = LogsRepository::new(
1076            create_ring_buffer(1_000_000),
1077            std::iter::empty(),
1078            inspector.root(),
1079            scope.new_child(),
1080        );
1081        let inspect_repo =
1082            Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], scope.new_child()));
1083        let server = ArchiveAccessorServer::new(
1084            inspect_repo,
1085            log_repo,
1086            4,
1087            BatchRetrievalTimeout::max(),
1088            scope.new_child(),
1089        );
1090        server.spawn_server(pipeline, stream);
1091
1092        // A selector of the form `component:node/path:property` is rejected.
1093        let (batch_iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
1094        assert!(
1095            accessor
1096                .r#stream_diagnostics(
1097                    &StreamParameters {
1098                        data_type: Some(DataType::Logs),
1099                        stream_mode: Some(StreamMode::Subscribe),
1100                        format: Some(Format::Json),
1101                        client_selector_configuration: Some(
1102                            ClientSelectorConfiguration::SelectAll(true)
1103                        ),
1104                        ..Default::default()
1105                    },
1106                    server_end
1107                )
1108                .is_ok()
1109        );
1110
1111        // We receive a response for WaitForReady
1112        assert!(batch_iterator.wait_for_ready().await.is_ok());
1113    }
1114}