heapdump_snapshot/
streamer.rs1use flex_fuchsia_memory_heapdump_client as fheapdump_client;
6use measure_tape_for_snapshot_element::Measurable;
7use zx_types::ZX_CHANNEL_MAX_MSG_BYTES;
8
9use crate::Error;
10
11const FIDL_VECTOR_HEADER_BYTES: usize = 16;
14
15const FIDL_HEADER_BYTES: usize = 16;
18
19const EMPTY_BUFFER_SIZE: usize = FIDL_HEADER_BYTES + FIDL_VECTOR_HEADER_BYTES;
22
23pub struct Streamer<'a> {
25 dest: &'a mut fheapdump_client::SnapshotReceiverProxy,
26 buffer: Vec<fheapdump_client::SnapshotElement>,
27 buffer_size: usize,
28}
29
30impl<'a> Streamer<'a> {
31 pub fn new(dest: &'a mut fheapdump_client::SnapshotReceiverProxy) -> Streamer<'a> {
36 Streamer { dest, buffer: Vec::new(), buffer_size: EMPTY_BUFFER_SIZE }
37 }
38
39 pub async fn push_element(
44 mut self,
45 elem: fheapdump_client::SnapshotElement,
46 ) -> Result<Self, Error> {
47 let elem_size = elem.measure().num_bytes;
48
49 if self.buffer_size + elem_size > ZX_CHANNEL_MAX_MSG_BYTES as usize {
51 self.flush_buffer().await?;
52 }
53
54 self.buffer.push(elem);
56 self.buffer_size += elem_size;
57
58 Ok(self)
59 }
60
61 pub async fn end_of_snapshot(mut self) -> Result<(), Error> {
63 if !self.buffer.is_empty() {
65 self.flush_buffer().await?;
66 }
67
68 self.flush_buffer().await?;
70
71 Ok(())
72 }
73
74 async fn flush_buffer(&mut self) -> Result<(), Error> {
75 let buffer = std::mem::replace(&mut self.buffer, Vec::new());
77 self.buffer_size = EMPTY_BUFFER_SIZE;
78
79 let fut = self.dest.batch(&buffer);
81 Ok(fut.await?)
82 }
83}
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88 use fuchsia_async as fasync;
89 use maplit::hashmap;
90 use std::collections::HashMap;
91 use std::rc::Rc;
92 use test_case::test_case;
93
94 use crate::ThreadInfo;
95 use crate::snapshot::Snapshot;
96
97 fn generate_one_million_allocations_hashmap() -> HashMap<u64, u64> {
100 let mut result = HashMap::new();
101 let mut addr = 0;
102 for size in 0..1000000 {
103 result.insert(addr, size);
104 addr += size;
105 }
106 result
107 }
108
109 const FAKE_TIMESTAMP: fidl::MonotonicInstant = fidl::MonotonicInstant::from_nanos(12345678);
110 const FAKE_THREAD_KOID: u64 = 8989;
111 const FAKE_THREAD_NAME: &str = "fake-thread-name";
112 const FAKE_THREAD_KEY: u64 = 1212;
113 const FAKE_STACK_TRACE_ADDRESSES: [u64; 3] = [11111, 22222, 33333];
114 const FAKE_STACK_TRACE_KEY: u64 = 1234;
115 const FAKE_REGION_ADDRESS: u64 = 8192;
116 const FAKE_REGION_NAME: &str = "fake-region-name";
117 const FAKE_REGION_SIZE: u64 = 28672;
118 const FAKE_REGION_FILE_OFFSET: u64 = 4096;
119 const FAKE_REGION_VADDR: u64 = 12288;
120 const FAKE_REGION_BUILD_ID: &[u8] = &[0xee; 20];
121
122 #[test_case(hashmap! {} ; "empty")]
123 #[test_case(hashmap! { 1234 => 5678 } ; "only one")]
124 #[test_case(generate_one_million_allocations_hashmap() ; "one million")]
125 #[fasync::run_singlethreaded(test)]
126 async fn test_streamer(allocations: HashMap<u64, u64>) {
127 #[cfg(feature = "fdomain")]
128 let client = fdomain_local::local_client_empty();
129 #[cfg(not(feature = "fdomain"))]
130 let client = fidl::endpoints::ZirconClient;
131 let (mut receiver_proxy, receiver_stream) =
132 client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
133 let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
134
135 let mut streamer = Streamer::new(&mut receiver_proxy)
138 .push_element(fheapdump_client::SnapshotElement::ThreadInfo(
139 fheapdump_client::ThreadInfo {
140 thread_info_key: Some(FAKE_THREAD_KEY),
141 koid: Some(FAKE_THREAD_KOID),
142 name: Some(FAKE_THREAD_NAME.to_string()),
143 ..Default::default()
144 },
145 ))
146 .await
147 .unwrap()
148 .push_element(fheapdump_client::SnapshotElement::StackTrace(
149 fheapdump_client::StackTrace {
150 stack_trace_key: Some(FAKE_STACK_TRACE_KEY),
151 program_addresses: Some(FAKE_STACK_TRACE_ADDRESSES.to_vec()),
152 ..Default::default()
153 },
154 ))
155 .await
156 .unwrap()
157 .push_element(fheapdump_client::SnapshotElement::ExecutableRegion(
158 fheapdump_client::ExecutableRegion {
159 address: Some(FAKE_REGION_ADDRESS),
160 name: Some(FAKE_REGION_NAME.to_string()),
161 size: Some(FAKE_REGION_SIZE),
162 file_offset: Some(FAKE_REGION_FILE_OFFSET),
163 vaddr: Some(FAKE_REGION_VADDR),
164 build_id: Some(fheapdump_client::BuildId {
165 value: FAKE_REGION_BUILD_ID.to_vec(),
166 }),
167 ..Default::default()
168 },
169 ))
170 .await
171 .unwrap();
172 for (address, size) in &allocations {
173 streamer = streamer
174 .push_element(fheapdump_client::SnapshotElement::Allocation(
175 fheapdump_client::Allocation {
176 address: Some(*address),
177 size: Some(*size),
178 thread_info_key: Some(FAKE_THREAD_KEY),
179 stack_trace_key: Some(FAKE_STACK_TRACE_KEY),
180 timestamp: Some(FAKE_TIMESTAMP),
181 ..Default::default()
182 },
183 ))
184 .await
185 .unwrap();
186 }
187 streamer.end_of_snapshot().await.unwrap();
188
189 let mut received_snapshot = receive_worker.await.unwrap();
192 let mut received_allocations: HashMap<u64, &crate::snapshot::Allocation> =
193 received_snapshot
194 .allocations
195 .iter()
196 .map(|alloc| (alloc.address.unwrap(), alloc))
197 .collect();
198 for (address, size) in &allocations {
199 let allocation = received_allocations.remove(address).unwrap();
200
201 assert_eq!(allocation.size, *size);
202 assert_eq!(
203 allocation.thread_info,
204 Some(Rc::new(ThreadInfo {
205 koid: FAKE_THREAD_KOID,
206 name: FAKE_THREAD_NAME.to_owned()
207 }))
208 );
209 assert_eq!(allocation.stack_trace.program_addresses, FAKE_STACK_TRACE_ADDRESSES);
210 assert_eq!(allocation.timestamp, Some(FAKE_TIMESTAMP));
211 }
212 assert!(received_allocations.is_empty(), "all the entries have been removed");
213 let region = received_snapshot.executable_regions.remove(&FAKE_REGION_ADDRESS).unwrap();
214 assert_eq!(region.name, FAKE_REGION_NAME);
215 assert_eq!(region.size, FAKE_REGION_SIZE);
216 assert_eq!(region.file_offset, FAKE_REGION_FILE_OFFSET);
217 assert_eq!(region.vaddr.unwrap(), FAKE_REGION_VADDR);
218 assert_eq!(region.build_id, FAKE_REGION_BUILD_ID);
219 assert!(received_snapshot.executable_regions.is_empty(), "all entries have been removed");
220 }
221}