use crate::{identity::ComponentIdentity, logs::container::LogsArtifactsContainer};
use fuchsia_sync::Mutex;
use futures::channel::mpsc;
use std::sync::{Arc, Weak};
use tracing::{debug, warn};
#[derive(Clone)]
pub struct BudgetManager {
state: Arc<Mutex<BudgetState>>,
}
impl BudgetManager {
pub fn new(capacity: usize, remover: mpsc::UnboundedSender<Arc<ComponentIdentity>>) -> Self {
Self {
state: Arc::new(Mutex::new(BudgetState {
remover,
capacity,
current: 0,
containers: vec![],
})),
}
}
pub fn add_container(&self, container: Arc<LogsArtifactsContainer>) {
self.state.lock().containers.push(container);
}
pub fn handle(&self) -> BudgetHandle {
BudgetHandle { state: Arc::downgrade(&self.state) }
}
pub fn terminate(&self) {
self.state.lock().terminate();
}
}
#[derive(Debug)]
struct BudgetState {
current: usize,
capacity: usize,
remover: mpsc::UnboundedSender<Arc<ComponentIdentity>>,
containers: Vec<Arc<LogsArtifactsContainer>>,
}
impl BudgetState {
fn allocate(&mut self, size: usize) {
self.current += size;
while self.current > self.capacity {
self.containers.sort_unstable_by_key(|c| c.oldest_timestamp().unwrap_or(i64::MAX));
let container_with_oldest = Arc::clone(
self.containers
.first()
.expect("containers are added to budget before they can call allocate"),
);
let oldest_message = container_with_oldest
.pop()
.expect("if we need to free space, we have messages to remove");
self.current -= oldest_message.size();
}
let mut i = 0;
while i != self.containers.len() {
if !self.containers[i].should_retain() {
let container = self.containers.remove(i);
container.terminate();
debug!(
identity = %container.identity,
"Removing now that we've popped the last message.");
self.remover.unbounded_send(Arc::clone(&container.identity)).unwrap_or_else(
|err| {
warn!(
%err,
identity = %container.identity, "Failed to send identity for removal");
},
);
} else {
i += 1;
}
}
}
fn terminate(&self) {
for container in &self.containers {
container.terminate();
}
}
}
#[derive(Debug)]
pub struct BudgetHandle {
state: Weak<Mutex<BudgetState>>,
}
impl BudgetHandle {
pub fn allocate(&self, size: usize) {
self.state.upgrade().expect("budgetmanager outlives all containers").lock().allocate(size);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
logs::{
multiplex::PinStream,
stored_message::{GenericStoredMessage, StructuredStoredMessage},
},
testing::TEST_IDENTITY,
};
use diagnostics_data::{LogsData, Severity};
use diagnostics_log_encoding::{
encode::Encoder, Argument, Record, Severity as StreamSeverity, Value,
};
use fidl_fuchsia_diagnostics::StreamMode;
use fuchsia_trace as ftrace;
use futures::{Stream, StreamExt};
use std::{
io::Cursor,
pin::Pin,
task::{Context, Poll},
};
struct CursorWrapper(PinStream<Arc<LogsData>>);
impl Stream for CursorWrapper {
type Item = Arc<LogsData>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.0.as_mut().poll_next(cx)
}
}
#[fuchsia::test]
async fn verify_container_is_terminated_on_removal() {
let (snd, _rcv) = mpsc::unbounded();
let manager = BudgetManager::new(128, snd);
let container_a = Arc::new(LogsArtifactsContainer::new(
TEST_IDENTITY.clone(),
std::iter::empty(),
fuchsia_inspect::component::inspector().root(),
manager.handle(),
));
let container_b = Arc::new(LogsArtifactsContainer::new(
TEST_IDENTITY.clone(),
std::iter::empty(),
fuchsia_inspect::component::inspector().root(),
manager.handle(),
));
manager.add_container(Arc::clone(&container_a));
manager.add_container(Arc::clone(&container_b));
assert_eq!(manager.state.lock().containers.len(), 2);
container_b.ingest_message(fake_message_bytes(1));
container_a.ingest_message(fake_message_bytes(2));
let mut cursor = CursorWrapper(
container_b.cursor(StreamMode::SnapshotThenSubscribe, ftrace::Id::random()),
);
assert_eq!(cursor.next().await, Some(Arc::new(fake_message(1))));
container_b.mark_stopped();
container_a.ingest_message(fake_message_bytes(3));
assert_eq!(manager.state.lock().containers.len(), 1);
assert_eq!(container_b.buffer().final_entry(), 1);
assert_eq!(cursor.next().await, None);
}
fn fake_message_bytes(timestamp: i64) -> GenericStoredMessage {
let record = Record {
timestamp,
severity: StreamSeverity::Debug.into_primitive(),
arguments: vec![
Argument { name: "pid".to_string(), value: Value::UnsignedInt(123) },
Argument { name: "tid".to_string(), value: Value::UnsignedInt(456) },
],
};
let mut buffer = Cursor::new(vec![0u8; 1024]);
let mut encoder = Encoder::new(&mut buffer);
encoder.write_record(&record).unwrap();
let encoded = &buffer.get_ref()[..buffer.position() as usize];
StructuredStoredMessage::create(encoded.to_vec(), Default::default())
}
fn fake_message(timestamp: i64) -> LogsData {
diagnostics_data::LogsDataBuilder::new(diagnostics_data::BuilderArgs {
timestamp_nanos: timestamp.into(),
component_url: Some(TEST_IDENTITY.url.to_string()),
moniker: TEST_IDENTITY.to_string(),
severity: Severity::Debug,
})
.set_pid(123)
.set_tid(456)
.build()
}
}