archivist_lib/
accessor.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::constants::FORMATTED_CONTENT_CHUNK_SIZE_TARGET;
6use crate::diagnostics::{BatchIteratorConnectionStats, TRACE_CATEGORY};
7use crate::error::AccessorError;
8use crate::formatter::{
9    FXTPacketSerializer, FormattedStream, JsonPacketSerializer, SerializedVmo, new_batcher,
10};
11use crate::inspect::ReaderServer;
12use crate::inspect::repository::InspectRepository;
13use crate::logs::container::CursorItem;
14use crate::logs::repository::LogsRepository;
15use crate::pipeline::Pipeline;
16use diagnostics_data::{Data, DiagnosticsData, ExtendedMoniker, Metadata};
17use fidl::endpoints::{ControlHandle, RequestStream};
18use fidl_fuchsia_diagnostics::{
19    ArchiveAccessorRequest, ArchiveAccessorRequestStream, BatchIteratorControlHandle,
20    BatchIteratorRequest, BatchIteratorRequestStream, ClientSelectorConfiguration, DataType,
21    Format, FormattedContent, PerformanceConfiguration, Selector, SelectorArgument, StreamMode,
22    StreamParameters, StringSelector, TreeSelector, TreeSelectorUnknown,
23};
24use fidl_fuchsia_mem::Buffer;
25use fuchsia_inspect::NumericProperty;
26use fuchsia_sync::Mutex;
27use futures::future::{Either, select};
28use futures::prelude::*;
29use futures::stream::Peekable;
30use futures::{StreamExt, pin_mut};
31use log::warn;
32use selectors::FastError;
33use serde::Serialize;
34use std::collections::HashMap;
35use std::pin::Pin;
36use std::sync::Arc;
37use thiserror::Error;
38use {fidl_fuchsia_diagnostics_host as fhost, fuchsia_async as fasync, fuchsia_trace as ftrace};
39
40#[derive(Debug, Copy, Clone)]
41pub struct BatchRetrievalTimeout(i64);
42
43impl BatchRetrievalTimeout {
44    pub fn from_seconds(s: i64) -> Self {
45        Self(s)
46    }
47
48    #[cfg(test)]
49    pub fn max() -> Self {
50        Self::from_seconds(-1)
51    }
52
53    pub fn seconds(&self) -> i64 {
54        if self.0 > 0 { self.0 } else { i64::MAX }
55    }
56}
57
58/// ArchiveAccessorServer represents an incoming connection from a client to an Archivist
59/// instance, through which the client may make Reader requests to the various data
60/// sources the Archivist offers.
61pub struct ArchiveAccessorServer {
62    inspect_repository: Arc<InspectRepository>,
63    logs_repository: Arc<LogsRepository>,
64    maximum_concurrent_snapshots_per_reader: u64,
65    scope: fasync::Scope,
66    default_batch_timeout_seconds: BatchRetrievalTimeout,
67}
68
69fn validate_and_parse_selectors(
70    selector_args: Vec<SelectorArgument>,
71) -> Result<Vec<Selector>, AccessorError> {
72    let mut selectors = vec![];
73    let mut errors = vec![];
74
75    if selector_args.is_empty() {
76        return Err(AccessorError::EmptySelectors);
77    }
78
79    for selector_arg in selector_args {
80        match selectors::take_from_argument::<FastError>(selector_arg) {
81            Ok(s) => selectors.push(s),
82            Err(e) => errors.push(e),
83        }
84    }
85
86    if !errors.is_empty() {
87        warn!(errors:?; "Found errors in selector arguments");
88    }
89
90    Ok(selectors)
91}
92
93fn validate_and_parse_log_selectors(
94    selector_args: Vec<SelectorArgument>,
95) -> Result<Vec<Selector>, AccessorError> {
96    // Only accept selectors of the type: `component:root` for logs for now.
97    let selectors = validate_and_parse_selectors(selector_args)?;
98    for selector in &selectors {
99        // Unwrap safe: Previous validation discards any selector without a node.
100        let tree_selector = selector.tree_selector.as_ref().unwrap();
101        match tree_selector {
102            TreeSelector::PropertySelector(_) => {
103                return Err(AccessorError::InvalidLogSelector);
104            }
105            TreeSelector::SubtreeSelector(subtree_selector) => {
106                if subtree_selector.node_path.len() != 1 {
107                    return Err(AccessorError::InvalidLogSelector);
108                }
109                match &subtree_selector.node_path[0] {
110                    StringSelector::ExactMatch(val) if val == "root" => {}
111                    StringSelector::StringPattern(val) if val == "root" => {}
112                    _ => {
113                        return Err(AccessorError::InvalidLogSelector);
114                    }
115                }
116            }
117            TreeSelectorUnknown!() => {}
118        }
119    }
120    Ok(selectors)
121}
122
123impl ArchiveAccessorServer {
124    /// Create a new accessor for interacting with the archivist's data. The pipeline
125    /// parameter determines which static configurations scope/restrict the visibility of
126    /// data accessed by readers spawned by this accessor.
127    pub fn new(
128        inspect_repository: Arc<InspectRepository>,
129        logs_repository: Arc<LogsRepository>,
130        maximum_concurrent_snapshots_per_reader: u64,
131        default_batch_timeout_seconds: BatchRetrievalTimeout,
132        scope: fasync::Scope,
133    ) -> Self {
134        ArchiveAccessorServer {
135            inspect_repository,
136            logs_repository,
137            maximum_concurrent_snapshots_per_reader,
138            scope,
139            default_batch_timeout_seconds,
140        }
141    }
142
143    async fn spawn<R: ArchiveAccessorWriter + Send>(
144        pipeline: Arc<Pipeline>,
145        inspect_repo: Arc<InspectRepository>,
146        log_repo: Arc<LogsRepository>,
147        requests: R,
148        params: StreamParameters,
149        maximum_concurrent_snapshots_per_reader: u64,
150        default_batch_timeout_seconds: BatchRetrievalTimeout,
151    ) -> Result<(), AccessorError> {
152        let format = params.format.ok_or(AccessorError::MissingFormat)?;
153        if !matches!(format, Format::Json | Format::Cbor | Format::Fxt) {
154            return Err(AccessorError::UnsupportedFormat);
155        }
156        let mode = params.stream_mode.ok_or(AccessorError::MissingMode)?;
157
158        let performance_config: PerformanceConfig = PerformanceConfig::new(
159            &params,
160            maximum_concurrent_snapshots_per_reader,
161            default_batch_timeout_seconds,
162        )?;
163
164        let trace_id = ftrace::Id::random();
165        match params.data_type.ok_or(AccessorError::MissingDataType)? {
166            DataType::Inspect => {
167                let _trace_guard = ftrace::async_enter!(
168                    trace_id,
169                    TRACE_CATEGORY,
170                    c"ArchiveAccessorServer::spawn",
171                    "data_type" => "Inspect",
172                    "trace_id" => u64::from(trace_id)
173                );
174                if !matches!(mode, StreamMode::Snapshot) {
175                    return Err(AccessorError::UnsupportedMode);
176                }
177                let stats = Arc::new(pipeline.accessor_stats().new_inspect_batch_iterator());
178
179                let selectors =
180                    params.client_selector_configuration.ok_or(AccessorError::MissingSelectors)?;
181
182                let selectors = match selectors {
183                    ClientSelectorConfiguration::Selectors(selectors) => {
184                        Some(validate_and_parse_selectors(selectors)?)
185                    }
186                    ClientSelectorConfiguration::SelectAll(_) => None,
187                    _ => return Err(AccessorError::InvalidSelectors("unrecognized selectors")),
188                };
189
190                let static_hierarchy_allowlist = pipeline.static_hierarchy_allowlist();
191                let unpopulated_container_vec =
192                    inspect_repo.fetch_inspect_data(&selectors, static_hierarchy_allowlist);
193
194                let per_component_budget_opt = if unpopulated_container_vec.is_empty() {
195                    None
196                } else {
197                    performance_config
198                        .aggregated_content_limit_bytes
199                        .map(|limit| (limit as usize) / unpopulated_container_vec.len())
200                };
201
202                if let Some(max_snapshot_size) = performance_config.aggregated_content_limit_bytes {
203                    stats.global_stats().record_max_snapshot_size_config(max_snapshot_size);
204                }
205                BatchIterator::new(
206                    ReaderServer::stream(
207                        unpopulated_container_vec,
208                        performance_config,
209                        selectors,
210                        Arc::clone(&stats),
211                        trace_id,
212                    ),
213                    requests,
214                    mode,
215                    stats,
216                    per_component_budget_opt,
217                    trace_id,
218                    format,
219                )?
220                .run()
221                .await
222            }
223            DataType::Logs => {
224                if format == Format::Cbor {
225                    // CBOR is not supported for logs
226                    return Err(AccessorError::UnsupportedFormat);
227                }
228                let _trace_guard = ftrace::async_enter!(
229                    trace_id,
230                    TRACE_CATEGORY,
231                    c"ArchiveAccessorServer::spawn",
232                    "data_type" => "Logs",
233                    // An async duration cannot have multiple concurrent child async durations
234                    // so we include the nonce as metadata to manually determine relationship.
235                    "trace_id" => u64::from(trace_id)
236                );
237                let stats = Arc::new(pipeline.accessor_stats().new_logs_batch_iterator());
238                let selectors = match params.client_selector_configuration {
239                    Some(ClientSelectorConfiguration::Selectors(selectors)) => {
240                        Some(validate_and_parse_log_selectors(selectors)?)
241                    }
242                    Some(ClientSelectorConfiguration::SelectAll(_)) => None,
243                    _ => return Err(AccessorError::InvalidSelectors("unrecognized selectors")),
244                };
245                match format {
246                    Format::Fxt => {
247                        let logs = log_repo.logs_cursor_raw(mode, selectors, trace_id);
248                        BatchIterator::new_serving_fxt(
249                            logs,
250                            requests,
251                            mode,
252                            stats,
253                            trace_id,
254                            performance_config,
255                        )?
256                        .run()
257                        .await?;
258                        Ok(())
259                    }
260                    Format::Json => {
261                        let logs = log_repo
262                            .logs_cursor(mode, selectors, trace_id)
263                            .map(move |inner: _| (*inner).clone());
264                        BatchIterator::new_serving_arrays(logs, requests, mode, stats, trace_id)?
265                            .run()
266                            .await?;
267                        Ok(())
268                    }
269                    // TODO(https://fxbug.dev/401548725): Remove this from the FIDL definition.
270                    Format::Text => Err(AccessorError::UnsupportedFormat),
271                    Format::Cbor => unreachable!("CBOR is not supported for logs"),
272                }
273            }
274        }
275    }
276
277    /// Spawn an instance `fidl_fuchsia_diagnostics/Archive` that allows clients to open
278    /// reader session to diagnostics data.
279    pub fn spawn_server<RequestStream>(&self, pipeline: Arc<Pipeline>, mut stream: RequestStream)
280    where
281        RequestStream: ArchiveAccessorTranslator + Send + 'static,
282        <RequestStream as ArchiveAccessorTranslator>::InnerDataRequestChannel:
283            ArchiveAccessorWriter + Send,
284    {
285        // Self isn't guaranteed to live into the exception handling of the async block. We need to clone self
286        // to have a version that can be referenced in the exception handling.
287        let log_repo = Arc::clone(&self.logs_repository);
288        let inspect_repo = Arc::clone(&self.inspect_repository);
289        let maximum_concurrent_snapshots_per_reader = self.maximum_concurrent_snapshots_per_reader;
290        let default_batch_timeout_seconds = self.default_batch_timeout_seconds;
291        let scope = self.scope.to_handle();
292        self.scope.spawn(async move {
293            let stats = pipeline.accessor_stats();
294            stats.global_stats.connections_opened.add(1);
295            while let Some(request) = stream.next().await {
296                let control_handle = request.iterator.get_control_handle();
297                stats.global_stats.stream_diagnostics_requests.add(1);
298                let pipeline = Arc::clone(&pipeline);
299
300                // Store the batch iterator task so that we can ensure that the client finishes
301                // draining items through it when a Controller#Stop call happens. For example,
302                // this allows tests to fetch all isolated logs before finishing.
303                let inspect_repo_for_task = Arc::clone(&inspect_repo);
304                let log_repo_for_task = Arc::clone(&log_repo);
305                scope.spawn(async move {
306                    if let Err(e) = Self::spawn(
307                        pipeline,
308                        inspect_repo_for_task,
309                        log_repo_for_task,
310                        request.iterator,
311                        request.parameters,
312                        maximum_concurrent_snapshots_per_reader,
313                        default_batch_timeout_seconds,
314                    )
315                    .await
316                        && let Some(control) = control_handle
317                    {
318                        e.close(control);
319                    }
320                });
321            }
322            pipeline.accessor_stats().global_stats.connections_closed.add(1);
323        });
324    }
325}
326
327pub trait ArchiveAccessorWriter {
328    /// Writes diagnostics data to the remote side.
329    fn write(
330        &mut self,
331        results: Vec<FormattedContent>,
332    ) -> impl Future<Output = Result<(), IteratorError>> + Send;
333
334    /// Waits for a buffer to be available for writing into. For sockets, this is a no-op.
335    fn wait_for_buffer(&mut self) -> impl Future<Output = anyhow::Result<()>> + Send {
336        futures::future::ready(Ok(()))
337    }
338
339    /// Takes the control handle from the FIDL stream (or returns None
340    /// if the handle has already been taken, or if this is a socket.
341    fn get_control_handle(&self) -> Option<BatchIteratorControlHandle> {
342        None
343    }
344
345    /// Sends an on ready event.
346    fn maybe_respond_ready(&mut self) -> impl Future<Output = Result<(), AccessorError>> + Send {
347        futures::future::ready(Ok(()))
348    }
349
350    /// Waits for ZX_ERR_PEER_CLOSED
351    fn wait_for_close(&mut self) -> impl Future<Output = ()> + Send;
352}
353
354fn get_buffer_from_formatted_content(
355    content: fidl_fuchsia_diagnostics::FormattedContent,
356) -> Result<Buffer, AccessorError> {
357    match content {
358        FormattedContent::Json(json) => Ok(json),
359        FormattedContent::Text(text) => Ok(text),
360        _ => Err(AccessorError::UnsupportedFormat),
361    }
362}
363
364impl ArchiveAccessorWriter for fuchsia_async::Socket {
365    async fn write(&mut self, data: Vec<FormattedContent>) -> Result<(), IteratorError> {
366        if data.is_empty() {
367            return Err(IteratorError::PeerClosed);
368        }
369        let mut buf = vec![0];
370        for value in data {
371            let data = get_buffer_from_formatted_content(value)?;
372            buf.resize(data.size as usize, 0);
373            data.vmo.read(&mut buf, 0)?;
374            let res = self.write_all(&buf).await;
375            if res.is_err() {
376                // connection probably closed.
377                break;
378            }
379        }
380        Ok(())
381    }
382
383    async fn wait_for_close(&mut self) {
384        let _ = self.on_closed().await;
385    }
386}
387
388#[derive(Error, Debug)]
389pub enum IteratorError {
390    #[error("Peer closed")]
391    PeerClosed,
392    #[error(transparent)]
393    Ipc(#[from] fidl::Error),
394    #[error(transparent)]
395    AccessorError(#[from] AccessorError),
396    // This error should be unreachable. We should never
397    // fail to read from a VMO that we created, but the type system
398    // requires us to handle this.
399    #[error("Error reading from VMO: {}", source)]
400    VmoReadError {
401        #[from]
402        source: zx::Status,
403    },
404}
405
406impl ArchiveAccessorWriter for Peekable<BatchIteratorRequestStream> {
407    async fn write(&mut self, data: Vec<FormattedContent>) -> Result<(), IteratorError> {
408        loop {
409            match self.next().await {
410                Some(Ok(BatchIteratorRequest::GetNext { responder })) => {
411                    responder.send(Ok(data))?;
412                    return Ok(());
413                }
414                Some(Ok(BatchIteratorRequest::WaitForReady { responder })) => {
415                    responder.send()?;
416                }
417                Some(Ok(BatchIteratorRequest::_UnknownMethod { method_type, ordinal, .. })) => {
418                    warn!(method_type:?, ordinal; "Got unknown interaction on BatchIterator");
419                    return Err(IteratorError::PeerClosed);
420                }
421                Some(Err(err)) => return Err(err.into()),
422                None => {
423                    return Err(IteratorError::PeerClosed);
424                }
425            }
426        }
427    }
428
429    async fn maybe_respond_ready(&mut self) -> Result<(), AccessorError> {
430        let mut this = Pin::new(self);
431        if matches!(this.as_mut().peek().await, Some(Ok(BatchIteratorRequest::WaitForReady { .. })))
432        {
433            let Some(Ok(BatchIteratorRequest::WaitForReady { responder })) = this.next().await
434            else {
435                unreachable!("We already checked the next request was WaitForReady");
436            };
437            responder.send()?;
438        }
439        Ok(())
440    }
441
442    async fn wait_for_buffer(&mut self) -> anyhow::Result<()> {
443        let this = Pin::new(self);
444        match this.peek().await {
445            Some(Ok(_)) => Ok(()),
446            _ => Err(IteratorError::PeerClosed.into()),
447        }
448    }
449
450    fn get_control_handle(&self) -> Option<BatchIteratorControlHandle> {
451        Some(self.get_ref().control_handle())
452    }
453
454    async fn wait_for_close(&mut self) {
455        let _ = self.get_ref().control_handle().on_closed().await;
456    }
457}
458
459pub struct ArchiveIteratorRequest<R> {
460    parameters: StreamParameters,
461    iterator: R,
462}
463
464/// Translation trait used to support both remote and
465/// local ArchiveAccessor implementations.
466pub trait ArchiveAccessorTranslator {
467    type InnerDataRequestChannel;
468    fn next(
469        &mut self,
470    ) -> impl Future<Output = Option<ArchiveIteratorRequest<Self::InnerDataRequestChannel>>> + Send;
471}
472
473impl ArchiveAccessorTranslator for fhost::ArchiveAccessorRequestStream {
474    type InnerDataRequestChannel = fuchsia_async::Socket;
475
476    async fn next(&mut self) -> Option<ArchiveIteratorRequest<Self::InnerDataRequestChannel>> {
477        match StreamExt::next(self).await {
478            Some(Ok(fhost::ArchiveAccessorRequest::StreamDiagnostics {
479                parameters,
480                responder,
481                stream,
482            })) => {
483                // It's fine for the client to send us a socket
484                // and discard the channel without waiting for a response.
485                // Future communication takes place over the socket so
486                // the client may opt to use this as an optimization.
487                let _ = responder.send();
488                Some(ArchiveIteratorRequest {
489                    iterator: fuchsia_async::Socket::from_socket(stream),
490                    parameters,
491                })
492            }
493            _ => None,
494        }
495    }
496}
497
498impl ArchiveAccessorTranslator for ArchiveAccessorRequestStream {
499    type InnerDataRequestChannel = Peekable<BatchIteratorRequestStream>;
500
501    async fn next(&mut self) -> Option<ArchiveIteratorRequest<Self::InnerDataRequestChannel>> {
502        loop {
503            match StreamExt::next(self).await {
504                Some(Ok(ArchiveAccessorRequest::StreamDiagnostics {
505                    control_handle: _,
506                    result_stream,
507                    stream_parameters,
508                })) => {
509                    return Some(ArchiveIteratorRequest {
510                        iterator: result_stream.into_stream().peekable(),
511                        parameters: stream_parameters,
512                    });
513                }
514                Some(Ok(ArchiveAccessorRequest::WaitForReady { responder })) => {
515                    let _ = responder.send();
516                }
517                _ => return None,
518            }
519        }
520    }
521}
522struct SchemaTruncationCounter {
523    truncated_schemas: u64,
524    total_schemas: u64,
525}
526
527impl SchemaTruncationCounter {
528    pub fn new() -> Arc<Mutex<Self>> {
529        Arc::new(Mutex::new(Self { truncated_schemas: 0, total_schemas: 0 }))
530    }
531}
532
533pub(crate) struct BatchIterator<R> {
534    requests: R,
535    stats: Arc<BatchIteratorConnectionStats>,
536    data: FormattedStream,
537    truncation_counter: Option<Arc<Mutex<SchemaTruncationCounter>>>,
538    parent_trace_id: ftrace::Id,
539}
540
541// Checks if a given schema is within a components budget, and if it is, updates the budget,
542// then returns true. Otherwise, if the schema is not within budget, returns false.
543fn maybe_update_budget(
544    budget_map: &mut HashMap<ExtendedMoniker, usize>,
545    moniker: &ExtendedMoniker,
546    bytes: usize,
547    byte_limit: usize,
548) -> bool {
549    if let Some(remaining_budget) = budget_map.get_mut(moniker) {
550        if *remaining_budget + bytes > byte_limit {
551            false
552        } else {
553            *remaining_budget += bytes;
554            true
555        }
556    } else if bytes > byte_limit {
557        budget_map.insert(moniker.clone(), 0);
558        false
559    } else {
560        budget_map.insert(moniker.clone(), bytes);
561        true
562    }
563}
564
565impl<R> BatchIterator<R>
566where
567    R: ArchiveAccessorWriter + Send,
568{
569    pub fn new<Items, D>(
570        data: Items,
571        requests: R,
572        mode: StreamMode,
573        stats: Arc<BatchIteratorConnectionStats>,
574        per_component_byte_limit_opt: Option<usize>,
575        parent_trace_id: ftrace::Id,
576        format: Format,
577    ) -> Result<Self, AccessorError>
578    where
579        Items: Stream<Item = Data<D>> + Send + 'static,
580        D: DiagnosticsData + 'static,
581    {
582        let result_stats_for_fut = Arc::clone(&stats);
583
584        let budget_tracker_shared = Arc::new(Mutex::new(HashMap::new()));
585
586        let truncation_counter = SchemaTruncationCounter::new();
587        let stream_owned_counter_for_fut = Arc::clone(&truncation_counter);
588
589        let data = data.then(move |mut d| {
590            let stream_owned_counter = Arc::clone(&stream_owned_counter_for_fut);
591            let result_stats = Arc::clone(&result_stats_for_fut);
592            let budget_tracker = Arc::clone(&budget_tracker_shared);
593            async move {
594                let trace_id = ftrace::Id::random();
595                let _trace_guard = ftrace::async_enter!(
596                    trace_id,
597                    TRACE_CATEGORY,
598                    c"BatchIterator::new.serialize",
599                    // An async duration cannot have multiple concurrent child async durations
600                    // so we include the nonce as metadata to manually determine relationship.
601                    "parent_trace_id" => u64::from(parent_trace_id),
602                    "trace_id" => u64::from(trace_id),
603                    "moniker" => d.moniker.to_string().as_ref()
604                );
605                let mut unlocked_counter = stream_owned_counter.lock();
606                let mut tracker_guard = budget_tracker.lock();
607                unlocked_counter.total_schemas += 1;
608                if d.metadata.has_errors() {
609                    result_stats.add_result_error();
610                }
611
612                match SerializedVmo::serialize(&d, D::DATA_TYPE, format) {
613                    Err(e) => {
614                        result_stats.add_result_error();
615                        Err(e)
616                    }
617                    Ok(contents) => {
618                        result_stats.add_result();
619                        match per_component_byte_limit_opt {
620                            Some(x) => {
621                                if maybe_update_budget(
622                                    &mut tracker_guard,
623                                    &d.moniker,
624                                    contents.size as usize,
625                                    x,
626                                ) {
627                                    Ok(contents)
628                                } else {
629                                    result_stats.add_schema_truncated();
630                                    unlocked_counter.truncated_schemas += 1;
631                                    d.drop_payload();
632                                    // TODO(66085): If a payload is truncated, cache the
633                                    // new schema so that we can reuse if other schemas from
634                                    // the same component get dropped.
635                                    SerializedVmo::serialize(&d, D::DATA_TYPE, format)
636                                }
637                            }
638                            None => Ok(contents),
639                        }
640                    }
641                }
642            }
643        });
644
645        Self::new_inner(
646            new_batcher(data, Arc::clone(&stats), mode),
647            requests,
648            stats,
649            Some(truncation_counter),
650            parent_trace_id,
651        )
652    }
653
654    pub fn new_serving_fxt<S>(
655        data: S,
656        requests: R,
657        mode: StreamMode,
658        stats: Arc<BatchIteratorConnectionStats>,
659        parent_trace_id: ftrace::Id,
660        performance_config: PerformanceConfig,
661    ) -> Result<Self, AccessorError>
662    where
663        S: Stream<Item = CursorItem> + Send + Unpin + 'static,
664    {
665        let data = FXTPacketSerializer::new(
666            Arc::clone(&stats),
667            performance_config
668                .aggregated_content_limit_bytes
669                .unwrap_or(FORMATTED_CONTENT_CHUNK_SIZE_TARGET),
670            data,
671        );
672        Self::new_inner(
673            new_batcher(data, Arc::clone(&stats), mode),
674            requests,
675            stats,
676            None,
677            parent_trace_id,
678        )
679    }
680
681    pub fn new_serving_arrays<D, S>(
682        data: S,
683        requests: R,
684        mode: StreamMode,
685        stats: Arc<BatchIteratorConnectionStats>,
686        parent_trace_id: ftrace::Id,
687    ) -> Result<Self, AccessorError>
688    where
689        D: Serialize + Send + 'static,
690        S: Stream<Item = D> + Send + Unpin + 'static,
691    {
692        let data = JsonPacketSerializer::new(
693            Arc::clone(&stats),
694            FORMATTED_CONTENT_CHUNK_SIZE_TARGET,
695            data,
696        );
697        Self::new_inner(
698            new_batcher(data, Arc::clone(&stats), mode),
699            requests,
700            stats,
701            None,
702            parent_trace_id,
703        )
704    }
705
706    fn new_inner(
707        data: FormattedStream,
708        requests: R,
709        stats: Arc<BatchIteratorConnectionStats>,
710        truncation_counter: Option<Arc<Mutex<SchemaTruncationCounter>>>,
711        parent_trace_id: ftrace::Id,
712    ) -> Result<Self, AccessorError> {
713        stats.open_connection();
714        Ok(Self { data, requests, stats, truncation_counter, parent_trace_id })
715    }
716
717    pub async fn run(mut self) -> Result<(), AccessorError> {
718        self.requests.maybe_respond_ready().await?;
719        while self.requests.wait_for_buffer().await.is_ok() {
720            self.stats.add_request();
721            let start_time = zx::MonotonicInstant::get();
722            let trace_id = ftrace::Id::random();
723            let _trace_guard = ftrace::async_enter!(
724                trace_id,
725                TRACE_CATEGORY,
726                c"BatchIterator::run.get_send_batch",
727                // An async duration cannot have multiple concurrent child async durations
728                // so we include the nonce as metadata to manually determine relationship.
729                "parent_trace_id" => u64::from(self.parent_trace_id),
730                "trace_id" => u64::from(trace_id)
731            );
732            let batch = {
733                let wait_for_close = self.requests.wait_for_close();
734                let next_data = self.data.next();
735                pin_mut!(wait_for_close);
736                match select(next_data, wait_for_close).await {
737                    // if we get None back, treat that as a terminal batch with an empty vec
738                    Either::Left((batch_option, _)) => batch_option.unwrap_or_default(),
739                    // if the client closes the channel, stop waiting and terminate.
740                    Either::Right(_) => break,
741                }
742            };
743
744            // turn errors into epitaphs -- we drop intermediate items if there was an error midway
745            let batch = batch.into_iter().collect::<Result<Vec<_>, _>>()?;
746
747            // increment counters
748            self.stats.add_response();
749            if batch.is_empty() {
750                if let Some(truncation_count) = &self.truncation_counter {
751                    let unlocked_count = truncation_count.lock();
752                    if unlocked_count.total_schemas > 0 {
753                        self.stats.global_stats().record_percent_truncated_schemas(
754                            ((unlocked_count.truncated_schemas as f32
755                                / unlocked_count.total_schemas as f32)
756                                * 100.0)
757                                .round() as u64,
758                        );
759                    }
760                }
761                self.stats.add_terminal();
762            }
763            self.stats
764                .global_stats()
765                .record_batch_duration(zx::MonotonicInstant::get() - start_time);
766            if self.requests.write(batch).await.is_err() {
767                // Peer closed, end the stream.
768                break;
769            }
770        }
771        Ok(())
772    }
773}
774
775impl<R> Drop for BatchIterator<R> {
776    fn drop(&mut self) {
777        self.stats.close_connection();
778    }
779}
780
781pub struct PerformanceConfig {
782    pub batch_timeout_sec: i64,
783    pub aggregated_content_limit_bytes: Option<u64>,
784    pub maximum_concurrent_snapshots_per_reader: u64,
785}
786
787impl PerformanceConfig {
788    pub fn new(
789        params: &StreamParameters,
790        maximum_concurrent_snapshots_per_reader: u64,
791        default_batch_timeout_seconds: BatchRetrievalTimeout,
792    ) -> Result<PerformanceConfig, AccessorError> {
793        let batch_timeout = match params {
794            // If only nested batch retrieval timeout is definitely not set,
795            // use the optional outer field.
796            StreamParameters {
797                batch_retrieval_timeout_seconds,
798                performance_configuration: None,
799                ..
800            }
801            | StreamParameters {
802                batch_retrieval_timeout_seconds,
803                performance_configuration:
804                    Some(PerformanceConfiguration { batch_retrieval_timeout_seconds: None, .. }),
805                ..
806            } => batch_retrieval_timeout_seconds,
807            // If the outer field is definitely not set, and the inner field might be,
808            // use the inner field.
809            StreamParameters {
810                batch_retrieval_timeout_seconds: None,
811                performance_configuration:
812                    Some(PerformanceConfiguration { batch_retrieval_timeout_seconds, .. }),
813                ..
814            } => batch_retrieval_timeout_seconds,
815            // Both the inner and outer fields are set, which is an error.
816            _ => return Err(AccessorError::DuplicateBatchTimeout),
817        }
818        .map(BatchRetrievalTimeout::from_seconds)
819        .unwrap_or(default_batch_timeout_seconds);
820
821        let aggregated_content_limit_bytes = match params {
822            StreamParameters {
823                performance_configuration:
824                    Some(PerformanceConfiguration { max_aggregate_content_size_bytes, .. }),
825                ..
826            } => *max_aggregate_content_size_bytes,
827            _ => None,
828        };
829
830        Ok(PerformanceConfig {
831            batch_timeout_sec: batch_timeout.seconds(),
832            aggregated_content_limit_bytes,
833            maximum_concurrent_snapshots_per_reader,
834        })
835    }
836}
837
838#[cfg(test)]
839mod tests {
840    use super::*;
841    use crate::diagnostics::AccessorStats;
842    use crate::logs::shared_buffer::create_ring_buffer;
843    use assert_matches::assert_matches;
844    use fidl_fuchsia_diagnostics::{ArchiveAccessorMarker, BatchIteratorMarker};
845    use fuchsia_inspect::{Inspector, Node};
846    use zx::AsHandleRef;
847
848    #[fuchsia::test]
849    async fn logs_only_accept_basic_component_selectors() {
850        let scope = fasync::Scope::new();
851        let (accessor, stream) =
852            fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>();
853        let pipeline = Arc::new(Pipeline::for_test(None));
854        let inspector = Inspector::default();
855        let log_repo = LogsRepository::new(
856            create_ring_buffer(1_000_000),
857            std::iter::empty(),
858            inspector.root(),
859            scope.new_child(),
860        );
861        let inspect_repo =
862            Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], scope.new_child()));
863        let server = ArchiveAccessorServer::new(
864            inspect_repo,
865            log_repo,
866            4,
867            BatchRetrievalTimeout::max(),
868            scope.new_child(),
869        );
870        server.spawn_server(pipeline, stream);
871
872        // A selector of the form `component:node/path:property` is rejected.
873        let (batch_iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
874        assert!(
875            accessor
876                .r#stream_diagnostics(
877                    &StreamParameters {
878                        data_type: Some(DataType::Logs),
879                        stream_mode: Some(StreamMode::SnapshotThenSubscribe),
880                        format: Some(Format::Json),
881                        client_selector_configuration: Some(
882                            ClientSelectorConfiguration::Selectors(vec![
883                                SelectorArgument::RawSelector("foo:root/bar:baz".to_string()),
884                            ])
885                        ),
886                        ..Default::default()
887                    },
888                    server_end
889                )
890                .is_ok()
891        );
892        assert_matches!(
893            batch_iterator.get_next().await,
894            Err(fidl::Error::ClientChannelClosed { status: zx_status::Status::INVALID_ARGS, .. })
895        );
896
897        // A selector of the form `component:root` is accepted.
898        let (batch_iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
899        assert!(
900            accessor
901                .r#stream_diagnostics(
902                    &StreamParameters {
903                        data_type: Some(DataType::Logs),
904                        stream_mode: Some(StreamMode::Snapshot),
905                        format: Some(Format::Json),
906                        client_selector_configuration: Some(
907                            ClientSelectorConfiguration::Selectors(vec![
908                                SelectorArgument::RawSelector("foo:root".to_string()),
909                            ])
910                        ),
911                        ..Default::default()
912                    },
913                    server_end
914                )
915                .is_ok()
916        );
917
918        assert!(batch_iterator.get_next().await.is_ok());
919    }
920
921    #[fuchsia::test]
922    async fn accessor_skips_invalid_selectors() {
923        let scope = fasync::Scope::new();
924        let (accessor, stream) =
925            fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>();
926        let pipeline = Arc::new(Pipeline::for_test(None));
927        let inspector = Inspector::default();
928        let log_repo = LogsRepository::new(
929            create_ring_buffer(1_000_000),
930            std::iter::empty(),
931            inspector.root(),
932            scope.new_child(),
933        );
934        let inspect_repo =
935            Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], scope.new_child()));
936        let server = Arc::new(ArchiveAccessorServer::new(
937            inspect_repo,
938            log_repo,
939            4,
940            BatchRetrievalTimeout::max(),
941            scope.new_child(),
942        ));
943        server.spawn_server(pipeline, stream);
944
945        // A selector of the form `component:node/path:property` is rejected.
946        let (batch_iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
947
948        assert!(
949            accessor
950                .r#stream_diagnostics(
951                    &StreamParameters {
952                        data_type: Some(DataType::Inspect),
953                        stream_mode: Some(StreamMode::Snapshot),
954                        format: Some(Format::Json),
955                        client_selector_configuration: Some(
956                            ClientSelectorConfiguration::Selectors(vec![
957                                SelectorArgument::RawSelector("invalid".to_string()),
958                                SelectorArgument::RawSelector("valid:root".to_string()),
959                            ])
960                        ),
961                        ..Default::default()
962                    },
963                    server_end
964                )
965                .is_ok()
966        );
967
968        // The batch iterator proxy should remain valid and providing responses regardless of the
969        // invalid selectors that were given.
970        assert!(batch_iterator.get_next().await.is_ok());
971    }
972
973    #[fuchsia::test]
974    async fn buffered_iterator_handles_two_consecutive_buffer_waits() {
975        let (client, server) = fidl::endpoints::create_proxy_and_stream::<BatchIteratorMarker>();
976        let _fut = client.get_next();
977        let mut server = server.peekable();
978        assert_matches!(server.wait_for_buffer().await, Ok(()));
979        assert_matches!(server.wait_for_buffer().await, Ok(()));
980    }
981
982    #[fuchsia::test]
983    async fn buffered_iterator_handles_peer_closed() {
984        let (client, server) = fidl::endpoints::create_proxy_and_stream::<BatchIteratorMarker>();
985        let mut server = server.peekable();
986        drop(client);
987        assert_matches!(
988            server
989                .write(vec![FormattedContent::Text(Buffer {
990                    size: 1,
991                    vmo: zx::Vmo::create(1).unwrap(),
992                })])
993                .await,
994            Err(IteratorError::PeerClosed)
995        );
996    }
997
998    #[fuchsia::test]
999    fn socket_writer_handles_text() {
1000        let vmo = zx::Vmo::create(1).unwrap();
1001        vmo.write(&[5u8], 0).unwrap();
1002        let koid = vmo.get_koid().unwrap();
1003        let text = FormattedContent::Text(Buffer { size: 1, vmo });
1004        let result = get_buffer_from_formatted_content(text).unwrap();
1005        assert_eq!(result.size, 1);
1006        assert_eq!(result.vmo.get_koid().unwrap(), koid);
1007        let mut buffer = [0];
1008        result.vmo.read(&mut buffer, 0).unwrap();
1009        assert_eq!(buffer[0], 5);
1010    }
1011
1012    #[fuchsia::test]
1013    fn socket_writer_does_not_handle_cbor() {
1014        let vmo = zx::Vmo::create(1).unwrap();
1015        vmo.write(&[5u8], 0).unwrap();
1016        let text = FormattedContent::Cbor(vmo);
1017        let result = get_buffer_from_formatted_content(text);
1018        assert_matches!(result, Err(AccessorError::UnsupportedFormat));
1019    }
1020
1021    #[fuchsia::test]
1022    async fn socket_writer_handles_closed_socket() {
1023        let (local, remote) = zx::Socket::create_stream();
1024        drop(local);
1025        let mut remote = fuchsia_async::Socket::from_socket(remote);
1026        {
1027            let result = ArchiveAccessorWriter::write(
1028                &mut remote,
1029                vec![FormattedContent::Text(Buffer { size: 1, vmo: zx::Vmo::create(1).unwrap() })],
1030            )
1031            .await;
1032            assert_matches!(result, Ok(()));
1033        }
1034        remote.wait_for_close().await;
1035    }
1036
1037    #[fuchsia::test]
1038    fn batch_iterator_terminates_on_client_disconnect() {
1039        let mut executor = fasync::TestExecutor::new();
1040        let (batch_iterator_proxy, stream) =
1041            fidl::endpoints::create_proxy_and_stream::<BatchIteratorMarker>();
1042        // Create a batch iterator that uses a hung stream to serve logs.
1043        let batch_iterator = BatchIterator::new(
1044            futures::stream::pending::<diagnostics_data::Data<diagnostics_data::Logs>>(),
1045            stream.peekable(),
1046            StreamMode::Subscribe,
1047            Arc::new(AccessorStats::new(Node::default()).new_inspect_batch_iterator()),
1048            None,
1049            ftrace::Id::random(),
1050            Format::Json,
1051        )
1052        .expect("create batch iterator");
1053
1054        let mut batch_iterator_fut = batch_iterator.run().boxed();
1055        assert!(executor.run_until_stalled(&mut batch_iterator_fut).is_pending());
1056
1057        // After sending a request, the request should be unfulfilled.
1058        let mut iterator_request_fut = batch_iterator_proxy.get_next();
1059        assert!(executor.run_until_stalled(&mut iterator_request_fut).is_pending());
1060        assert!(executor.run_until_stalled(&mut batch_iterator_fut).is_pending());
1061        assert!(executor.run_until_stalled(&mut iterator_request_fut).is_pending());
1062
1063        // After closing the client end of the channel, the server should terminate and release
1064        // resources.
1065        drop(iterator_request_fut);
1066        drop(batch_iterator_proxy);
1067        assert_matches!(executor.run_singlethreaded(&mut batch_iterator_fut), Ok(()));
1068    }
1069
1070    #[fuchsia::test]
1071    async fn batch_iterator_on_ready_is_called() {
1072        let scope = fasync::Scope::new();
1073        let (accessor, stream) =
1074            fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>();
1075        let pipeline = Arc::new(Pipeline::for_test(None));
1076        let inspector = Inspector::default();
1077        let log_repo = LogsRepository::new(
1078            create_ring_buffer(1_000_000),
1079            std::iter::empty(),
1080            inspector.root(),
1081            scope.new_child(),
1082        );
1083        let inspect_repo =
1084            Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], scope.new_child()));
1085        let server = ArchiveAccessorServer::new(
1086            inspect_repo,
1087            log_repo,
1088            4,
1089            BatchRetrievalTimeout::max(),
1090            scope.new_child(),
1091        );
1092        server.spawn_server(pipeline, stream);
1093
1094        // A selector of the form `component:node/path:property` is rejected.
1095        let (batch_iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
1096        assert!(
1097            accessor
1098                .r#stream_diagnostics(
1099                    &StreamParameters {
1100                        data_type: Some(DataType::Logs),
1101                        stream_mode: Some(StreamMode::Subscribe),
1102                        format: Some(Format::Json),
1103                        client_selector_configuration: Some(
1104                            ClientSelectorConfiguration::SelectAll(true)
1105                        ),
1106                        ..Default::default()
1107                    },
1108                    server_end
1109                )
1110                .is_ok()
1111        );
1112
1113        // We receive a response for WaitForReady
1114        assert!(batch_iterator.wait_for_ready().await.is_ok());
1115    }
1116}