fake_archive_accessor/
fake_archive_accessor.rsmod archivist_accessor;
mod archivist_server;
use {
anyhow::{
bail,
Error,
},
archivist_accessor::ArchiveAccessor,
async_trait::async_trait,
fidl_fuchsia_diagnostics as diagnostics, fuchsia_async as fasync,
fuchsia_sync::Mutex,
futures::StreamExt,
std::collections::BTreeSet,
std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
tracing::*,
};
#[async_trait]
pub trait EventSignaler: Send + Sync {
async fn signal_fetch(&self);
async fn signal_done(&self);
async fn signal_error(&self, error: &str);
}
pub struct FakeArchiveAccessor {
event_signaler: Option<Box<dyn EventSignaler>>,
inspect_data: Vec<String>,
next_data: AtomicUsize,
selectors_requested: Mutex<Vec<BTreeSet<String>>>,
}
impl FakeArchiveAccessor {
pub fn new(
inspect_data: &[String],
event_signaler: Option<Box<dyn EventSignaler>>,
) -> Arc<FakeArchiveAccessor> {
Arc::new(FakeArchiveAccessor {
inspect_data: inspect_data.to_vec(),
event_signaler,
next_data: AtomicUsize::new(0),
selectors_requested: Mutex::new(vec![]),
})
}
async fn handle_fidl_request(
&self,
request: diagnostics::ArchiveAccessorRequest,
) -> Result<(), Error> {
let (stream_parameters, result_stream) = match request {
diagnostics::ArchiveAccessorRequest::StreamDiagnostics {
stream_parameters,
result_stream,
control_handle: _,
} => (stream_parameters, result_stream),
diagnostics::ArchiveAccessorRequest::_UnknownMethod { .. } => {
unreachable!("Unexpected method call");
}
};
let selectors = ArchiveAccessor::validate_stream_request(stream_parameters)?;
self.selectors_requested.lock().push(
selectors
.into_iter()
.map(|s| {
selectors::selector_to_string(
&s,
selectors::SelectorDisplayOptions::never_wrap_in_quotes(),
)
})
.collect::<Result<BTreeSet<_>, Error>>()?,
);
if let Some(s) = self.event_signaler.as_ref() {
s.signal_fetch().await;
}
let data_index = self.next_data.fetch_add(1, Ordering::Relaxed);
if data_index >= self.inspect_data.len() {
if let Some(s) = self.event_signaler.as_ref() {
s.signal_done().await;
}
} else if let Err(problem) =
ArchiveAccessor::send(result_stream, &self.inspect_data[data_index]).await
{
if let Some(s) = self.event_signaler.as_ref() {
s.signal_done().await;
s.signal_error(&format!("{}", problem)).await;
}
error!("Problem in request: {}", problem);
return Err(problem);
}
Ok(())
}
pub async fn serve_stream(
&self,
mut request_stream: diagnostics::ArchiveAccessorRequestStream,
) -> Result<(), Error> {
loop {
match request_stream.next().await {
Some(Ok(request)) => self.handle_fidl_request(request).await?,
Some(Err(e)) => {
if let Some(s) = self.event_signaler.as_ref() {
s.signal_done().await;
}
bail!("{}", e);
}
None => break,
}
}
Ok(())
}
pub fn serve_async(self: Arc<Self>, stream: diagnostics::ArchiveAccessorRequestStream) {
fasync::Task::spawn(async move {
let result = self.serve_stream(stream).await;
if let Err(e) = result {
error!("Error while serving ArchiveAccessor: {:?}", e);
}
})
.detach();
}
pub fn get_selectors_requested(&self) -> Vec<BTreeSet<String>> {
self.selectors_requested.lock().clone()
}
}