fake_archive_accessor/
fake_archive_accessor.rs
1mod archivist_accessor;
6mod archivist_server;
7
8use {
9 anyhow::{
10 bail,
11 Error,
13 },
14 archivist_accessor::ArchiveAccessor,
15 async_trait::async_trait,
16 fidl_fuchsia_diagnostics as diagnostics, fuchsia_async as fasync,
17 fuchsia_sync::Mutex,
18 futures::StreamExt,
19 log::*,
20 std::collections::BTreeSet,
21 std::sync::{
22 atomic::{AtomicUsize, Ordering},
23 Arc,
24 },
25};
26
27#[async_trait]
30pub trait EventSignaler: Send + Sync {
31 async fn signal_fetch(&self);
33 async fn signal_done(&self);
35 async fn signal_error(&self, error: &str);
37}
38
39pub struct FakeArchiveAccessor {
43 event_signaler: Option<Box<dyn EventSignaler>>,
44 inspect_data: Vec<String>,
45 next_data: AtomicUsize,
46 selectors_requested: Mutex<Vec<BTreeSet<String>>>,
48}
49
50impl FakeArchiveAccessor {
51 pub fn new(
56 inspect_data: &[String],
57 event_signaler: Option<Box<dyn EventSignaler>>,
58 ) -> Arc<FakeArchiveAccessor> {
59 Arc::new(FakeArchiveAccessor {
60 inspect_data: inspect_data.to_vec(),
61 event_signaler,
62 next_data: AtomicUsize::new(0),
63 selectors_requested: Mutex::new(vec![]),
64 })
65 }
66
67 async fn handle_fidl_request(
70 &self,
71 request: diagnostics::ArchiveAccessorRequest,
72 ) -> Result<(), Error> {
73 let (stream_parameters, result_stream) = match request {
74 diagnostics::ArchiveAccessorRequest::StreamDiagnostics {
75 stream_parameters,
76 result_stream,
77 control_handle: _,
78 } => (stream_parameters, result_stream),
79 diagnostics::ArchiveAccessorRequest::WaitForReady { .. }
80 | diagnostics::ArchiveAccessorRequest::_UnknownMethod { .. } => {
81 unreachable!("Unexpected method call");
82 }
83 };
84 let selectors = ArchiveAccessor::validate_stream_request(stream_parameters)?;
85 self.selectors_requested.lock().push(
86 selectors
87 .into_iter()
88 .map(|s| {
89 selectors::selector_to_string(
90 &s,
91 selectors::SelectorDisplayOptions::never_wrap_in_quotes(),
92 )
93 })
94 .collect::<Result<BTreeSet<_>, Error>>()?,
95 );
96 if let Some(s) = self.event_signaler.as_ref() {
97 s.signal_fetch().await;
98 }
99 let data_index = self.next_data.fetch_add(1, Ordering::Relaxed);
100 if data_index >= self.inspect_data.len() {
101 if let Some(s) = self.event_signaler.as_ref() {
104 s.signal_done().await;
105 }
106 } else if let Err(problem) =
107 ArchiveAccessor::send(result_stream, &self.inspect_data[data_index]).await
108 {
109 if let Some(s) = self.event_signaler.as_ref() {
110 s.signal_done().await;
111 s.signal_error(&format!("{}", problem)).await;
112 }
113 error!("Problem in request: {}", problem);
114 return Err(problem);
115 }
116 Ok(())
117 }
118
119 pub async fn serve_stream(
120 &self,
121 mut request_stream: diagnostics::ArchiveAccessorRequestStream,
122 ) -> Result<(), Error> {
123 loop {
124 match request_stream.next().await {
125 Some(Ok(request)) => self.handle_fidl_request(request).await?,
126 Some(Err(e)) => {
127 if let Some(s) = self.event_signaler.as_ref() {
128 s.signal_done().await;
129 }
130 bail!("{}", e);
131 }
132 None => break,
133 }
134 }
135 Ok(())
136 }
137
138 pub fn serve_async(self: Arc<Self>, stream: diagnostics::ArchiveAccessorRequestStream) {
139 fasync::Task::spawn(async move {
140 let result = self.serve_stream(stream).await;
141 if let Err(e) = result {
142 error!("Error while serving ArchiveAccessor: {:?}", e);
143 }
144 })
145 .detach();
146 }
147
148 pub fn get_selectors_requested(&self) -> Vec<BTreeSet<String>> {
149 self.selectors_requested.lock().clone()
150 }
151}