heapdump_snapshot/
streamer.rsuse fidl_fuchsia_memory_heapdump_client as fheapdump_client;
use measure_tape_for_snapshot_element::Measurable;
use zx_types::ZX_CHANNEL_MAX_MSG_BYTES;
use crate::Error;
const FIDL_VECTOR_HEADER_BYTES: usize = 16;
const FIDL_HEADER_BYTES: usize = 16;
const EMPTY_BUFFER_SIZE: usize = FIDL_HEADER_BYTES + FIDL_VECTOR_HEADER_BYTES;
pub struct Streamer {
dest: fheapdump_client::SnapshotReceiverProxy,
buffer: Vec<fheapdump_client::SnapshotElement>,
buffer_size: usize,
}
impl Streamer {
pub fn new(dest: fheapdump_client::SnapshotReceiverProxy) -> Streamer {
Streamer { dest, buffer: Vec::new(), buffer_size: EMPTY_BUFFER_SIZE }
}
pub async fn push_element(
mut self,
elem: fheapdump_client::SnapshotElement,
) -> Result<Streamer, Error> {
let elem_size = elem.measure().num_bytes;
if self.buffer_size + elem_size > ZX_CHANNEL_MAX_MSG_BYTES as usize {
self.flush_buffer().await?;
}
self.buffer.push(elem);
self.buffer_size += elem_size;
Ok(self)
}
pub async fn end_of_stream(mut self) -> Result<(), Error> {
if !self.buffer.is_empty() {
self.flush_buffer().await?;
}
self.flush_buffer().await?;
Ok(())
}
async fn flush_buffer(&mut self) -> Result<(), Error> {
let buffer = std::mem::replace(&mut self.buffer, Vec::new());
self.buffer_size = EMPTY_BUFFER_SIZE;
let fut = self.dest.batch(&buffer);
Ok(fut.await?)
}
}
#[cfg(test)]
mod tests {
use super::*;
use fidl::endpoints::create_proxy_and_stream;
use fuchsia_async as fasync;
use maplit::hashmap;
use std::collections::HashMap;
use test_case::test_case;
use crate::snapshot::Snapshot;
fn generate_one_million_allocations_hashmap() -> HashMap<u64, u64> {
let mut result = HashMap::new();
let mut addr = 0;
for size in 0..1000000 {
result.insert(addr, size);
addr += size;
}
result
}
const FAKE_TIMESTAMP: fidl::MonotonicInstant = fidl::MonotonicInstant::from_nanos(12345678);
const FAKE_THREAD_KOID: u64 = 8989;
const FAKE_THREAD_NAME: &str = "fake-thread-name";
const FAKE_THREAD_KEY: u64 = 1212;
const FAKE_STACK_TRACE_ADDRESSES: [u64; 3] = [11111, 22222, 33333];
const FAKE_STACK_TRACE_KEY: u64 = 1234;
const FAKE_REGION_ADDRESS: u64 = 8192;
const FAKE_REGION_SIZE: u64 = 28672;
const FAKE_REGION_FILE_OFFSET: u64 = 4096;
const FAKE_REGION_BUILD_ID: &[u8] = &[0xee; 20];
#[test_case(hashmap! {} ; "empty")]
#[test_case(hashmap! { 1234 => 5678 } ; "only one")]
#[test_case(generate_one_million_allocations_hashmap() ; "one million")]
#[fasync::run_singlethreaded(test)]
async fn test_streamer(allocations: HashMap<u64, u64>) {
let (receiver_proxy, receiver_stream) =
create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
let mut streamer = Streamer::new(receiver_proxy)
.push_element(fheapdump_client::SnapshotElement::ThreadInfo(
fheapdump_client::ThreadInfo {
thread_info_key: Some(FAKE_THREAD_KEY),
koid: Some(FAKE_THREAD_KOID),
name: Some(FAKE_THREAD_NAME.to_string()),
..Default::default()
},
))
.await
.unwrap()
.push_element(fheapdump_client::SnapshotElement::StackTrace(
fheapdump_client::StackTrace {
stack_trace_key: Some(FAKE_STACK_TRACE_KEY),
program_addresses: Some(FAKE_STACK_TRACE_ADDRESSES.to_vec()),
..Default::default()
},
))
.await
.unwrap()
.push_element(fheapdump_client::SnapshotElement::ExecutableRegion(
fheapdump_client::ExecutableRegion {
address: Some(FAKE_REGION_ADDRESS),
size: Some(FAKE_REGION_SIZE),
file_offset: Some(FAKE_REGION_FILE_OFFSET),
build_id: Some(fheapdump_client::BuildId {
value: FAKE_REGION_BUILD_ID.to_vec(),
}),
..Default::default()
},
))
.await
.unwrap();
for (address, size) in &allocations {
streamer = streamer
.push_element(fheapdump_client::SnapshotElement::Allocation(
fheapdump_client::Allocation {
address: Some(*address),
size: Some(*size),
thread_info_key: Some(FAKE_THREAD_KEY),
stack_trace_key: Some(FAKE_STACK_TRACE_KEY),
timestamp: Some(FAKE_TIMESTAMP),
..Default::default()
},
))
.await
.unwrap();
}
streamer.end_of_stream().await.unwrap();
let mut received_snapshot = receive_worker.await.unwrap();
for (address, size) in &allocations {
let allocation = received_snapshot.allocations.remove(address).unwrap();
assert_eq!(allocation.size, *size);
assert_eq!(allocation.thread_info.koid, FAKE_THREAD_KOID);
assert_eq!(allocation.thread_info.name, FAKE_THREAD_NAME);
assert_eq!(allocation.stack_trace.program_addresses, FAKE_STACK_TRACE_ADDRESSES);
assert_eq!(allocation.timestamp, FAKE_TIMESTAMP);
}
assert!(received_snapshot.allocations.is_empty(), "all the entries have been removed");
let region = received_snapshot.executable_regions.remove(&FAKE_REGION_ADDRESS).unwrap();
assert_eq!(region.size, FAKE_REGION_SIZE);
assert_eq!(region.file_offset, FAKE_REGION_FILE_OFFSET);
assert_eq!(region.build_id, FAKE_REGION_BUILD_ID);
assert!(received_snapshot.executable_regions.is_empty(), "all entries have been removed");
}
}