fuchsia_inspect_auto_persist/
lib.rsuse fidl_fuchsia_diagnostics_persist::PersistResult;
use futures::channel::mpsc;
use futures::{Future, StreamExt};
use injectable_time::{MonotonicInstant, TimeSource};
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tracing::{error, info};
pub type PersistenceReqSender = mpsc::Sender<String>;
pub struct AutoPersist<T> {
inspect_node: T,
persistence_tag: String,
persistence_req_sender: PersistenceReqSender,
sender_is_blocked: Arc<AtomicBool>,
}
impl<T> AutoPersist<T> {
pub fn new(
inspect_node: T,
persistence_tag: &str,
persistence_req_sender: PersistenceReqSender,
) -> Self {
Self {
inspect_node,
persistence_tag: persistence_tag.to_string(),
persistence_req_sender,
sender_is_blocked: Arc::new(AtomicBool::new(false)),
}
}
pub fn get_mut(&mut self) -> AutoPersistGuard<'_, T> {
AutoPersistGuard {
inspect_node: &mut self.inspect_node,
persistence_tag: &self.persistence_tag,
persistence_req_sender: &mut self.persistence_req_sender,
sender_is_blocked: Arc::clone(&self.sender_is_blocked),
}
}
}
pub struct AutoPersistGuard<'a, T> {
inspect_node: &'a mut T,
persistence_tag: &'a str,
persistence_req_sender: &'a mut PersistenceReqSender,
sender_is_blocked: Arc<AtomicBool>,
}
impl<T> Deref for AutoPersistGuard<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.inspect_node
}
}
impl<T> DerefMut for AutoPersistGuard<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.inspect_node
}
}
impl<T> Drop for AutoPersistGuard<'_, T> {
fn drop(&mut self) {
if self.persistence_req_sender.try_send(self.persistence_tag.to_string()).is_err() {
if self
.sender_is_blocked
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
error!("PersistenceReqSender dropped a persistence request: either buffer is full or no receiver is waiting");
}
} else {
if self
.sender_is_blocked
.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
info!("PersistenceReqSender recovered and resumed sending");
}
}
}
}
fn log_at_most_once_per_min_factory(
time_source: impl TimeSource,
mut log_fn: impl FnMut(String),
) -> impl FnMut(String) {
let mut last_logged = None;
move |message| {
let now = zx::MonotonicInstant::from_nanos(time_source.now());
let should_log = match last_logged {
Some(last_logged) => (now - last_logged) >= zx::MonotonicDuration::from_minutes(1),
None => true,
};
if should_log {
log_fn(message);
last_logged.replace(now);
}
}
}
const DEFAULT_BUFFER_SIZE: usize = 100;
pub fn create_persistence_req_sender(
persistence_proxy: fidl_fuchsia_diagnostics_persist::DataPersistenceProxy,
) -> (PersistenceReqSender, impl Future<Output = ()>) {
let (sender, mut receiver) = mpsc::channel::<String>(DEFAULT_BUFFER_SIZE);
let fut = async move {
let persistence_proxy = persistence_proxy.clone();
let mut log_error =
log_at_most_once_per_min_factory(MonotonicInstant::new(), |e| error!("{}", e));
while let Some(tag_name) = receiver.next().await {
let resp = persistence_proxy.persist(&tag_name).await;
match resp {
Ok(PersistResult::Queued) => continue,
Ok(other) => log_error(format!(
"Persistence Service returned an error for tag {}: {:?}",
tag_name, other
)),
Err(e) => log_error(format!(
"Failed to send request to Persistence Service for tag {}: {}",
tag_name, e
)),
}
}
};
(sender, fut)
}
#[cfg(test)]
mod tests {
use super::*;
use fidl::endpoints::create_proxy_and_stream;
use fidl_fuchsia_diagnostics_persist::DataPersistenceRequest;
use fuchsia_async as fasync;
use fuchsia_inspect::Inspector;
use futures::task::Poll;
use std::cell::RefCell;
use std::pin::pin;
use std::rc::Rc;
#[fuchsia::test]
fn test_auto_persist() {
let (sender, mut receiver) = mpsc::channel::<String>(100);
let inspector = Inspector::default();
let node = inspector.root().create_child("node");
let mut auto_persist_node = AutoPersist::new(node, "some-tag", sender);
assert!(receiver.try_next().is_err());
{
let _guard = auto_persist_node.get_mut();
}
match receiver.try_next() {
Ok(Some(tag)) => assert_eq!(tag, "some-tag"),
_ => panic!("expect message in receiver"),
}
}
#[fuchsia::test]
fn test_create_persistence_req_sender() {
let mut exec = fasync::TestExecutor::new();
let (persistence_proxy, mut persistence_stream) =
create_proxy_and_stream::<fidl_fuchsia_diagnostics_persist::DataPersistenceMarker>();
let (mut req_sender, req_forwarder_fut) = create_persistence_req_sender(persistence_proxy);
let mut req_forwarder_fut = pin!(req_forwarder_fut);
match exec.run_until_stalled(&mut req_forwarder_fut) {
Poll::Pending => (),
other => panic!("unexpected variant: {:?}", other),
};
match exec.run_until_stalled(&mut persistence_stream.next()) {
Poll::Pending => (),
other => panic!("unexpected variant: {:?}", other),
};
assert!(req_sender.try_send("some-tag".to_string()).is_ok());
match exec.run_until_stalled(&mut req_forwarder_fut) {
Poll::Pending => (),
other => panic!("unexpected variant: {:?}", other),
};
match exec.run_until_stalled(&mut persistence_stream.next()) {
Poll::Ready(Some(Ok(DataPersistenceRequest::Persist { tag, .. }))) => {
assert_eq!(tag, "some-tag")
}
other => panic!("unexpected variant: {:?}", other),
};
}
#[derive(Debug)]
struct FakeTimeSource {
now: Rc<RefCell<zx::MonotonicInstant>>,
}
impl TimeSource for FakeTimeSource {
fn now(&self) -> i64 {
self.now.borrow().into_nanos()
}
}
#[fuchsia::test]
fn test_log_at_most_once_per_min_factory() {
let log_count = Rc::new(RefCell::new(0));
let now = Rc::new(RefCell::new(zx::MonotonicInstant::from_nanos(0)));
let fake_time_source = FakeTimeSource { now: now.clone() };
let mut log =
log_at_most_once_per_min_factory(fake_time_source, |_| *log_count.borrow_mut() += 1);
log("message 1".to_string());
assert_eq!(*log_count.borrow(), 1);
log("message 2".to_string());
assert_eq!(*log_count.borrow(), 1);
{
*now.borrow_mut() += zx::MonotonicDuration::from_seconds(30);
}
log("message 3".to_string());
assert_eq!(*log_count.borrow(), 1);
{
*now.borrow_mut() += zx::MonotonicDuration::from_seconds(30);
}
log("message 3".to_string());
assert_eq!(*log_count.borrow(), 2);
}
}