stacktrack_snapshot/
streamer.rs1use flex_fuchsia_memory_stacktrack_client as fstacktrack_client;
6use measure_tape_for_snapshot_element::Measurable;
7use zx_types::ZX_CHANNEL_MAX_MSG_BYTES;
8
9use crate::Error;
10
11const FIDL_VECTOR_HEADER_BYTES: usize = 16;
14
15const FIDL_HEADER_BYTES: usize = 16;
18
19const EMPTY_BUFFER_SIZE: usize = FIDL_HEADER_BYTES + FIDL_VECTOR_HEADER_BYTES;
22
23pub struct Streamer<'a> {
25 dest: &'a mut fstacktrack_client::SnapshotReceiverProxy,
26 buffer: Vec<fstacktrack_client::SnapshotElement>,
27 buffer_size: usize,
28}
29
30impl<'a> Streamer<'a> {
31 pub fn new(dest: &'a mut fstacktrack_client::SnapshotReceiverProxy) -> Streamer<'a> {
36 Streamer { dest, buffer: Vec::new(), buffer_size: EMPTY_BUFFER_SIZE }
37 }
38
39 pub async fn push_element(
44 mut self,
45 elem: fstacktrack_client::SnapshotElement,
46 ) -> Result<Self, Error> {
47 let elem_size = elem.measure().num_bytes;
48
49 if self.buffer_size + elem_size > ZX_CHANNEL_MAX_MSG_BYTES as usize {
51 self.flush_buffer().await?;
52 }
53
54 self.buffer.push(elem);
56 self.buffer_size += elem_size;
57
58 Ok(self)
59 }
60
61 pub async fn end_of_snapshot(mut self) -> Result<(), Error> {
63 if !self.buffer.is_empty() {
65 self.flush_buffer().await?;
66 }
67
68 self.flush_buffer().await?;
70
71 Ok(())
72 }
73
74 async fn flush_buffer(&mut self) -> Result<(), Error> {
75 let buffer = std::mem::replace(&mut self.buffer, Vec::new());
77 self.buffer_size = EMPTY_BUFFER_SIZE;
78
79 let fut = self.dest.batch(&buffer);
81 Ok(fut.await?)
82 }
83}
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88 use crate::test_helpers::create_client;
89 use fuchsia_async as fasync;
90 use std::collections::HashMap;
91 use test_case::test_case;
92
93 use crate::snapshot::Snapshot;
94
95 const FAKE_PAGE_SIZE: u64 = 4096;
96 const FAKE_REGION_ADDRESS: u64 = 0x10000;
97 const FAKE_REGION_SIZE: u64 = 0x2000;
98 const FAKE_REGION_NAME: &str = "libexample.so";
99 const FAKE_REGION_VADDR: u64 = 0x5000;
100 const FAKE_REGION_BUILD_ID: &[u8] = &[0xAA, 0xBB, 0xCC, 0xDD];
101
102 fn generate_fake_threads(n: usize) -> HashMap<u64, Vec<(u64, u64)>> {
103 let mut result = HashMap::new();
104 for i in 0..n as u64 {
105 let koid = 1000 + i;
106
107 let mut frames = Vec::new();
109 for j in 0..(i % 321) {
110 frames.push(((i + j) ^ 0x1111, (i + j) ^ 0x2222));
111 }
112
113 result.insert(koid, frames);
114 }
115 result
116 }
117
118 #[test_case(0)]
119 #[test_case(1)]
120 #[test_case(100000)]
121 #[fasync::run_singlethreaded(test)]
122 async fn test_streamer(num_threads: usize) {
123 let fake_threads = generate_fake_threads(num_threads);
124
125 let client = create_client();
126 let (mut receiver_proxy, receiver_stream) =
127 client.create_proxy_and_stream::<fstacktrack_client::SnapshotReceiverMarker>();
128 let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
129
130 let mut streamer = Streamer::new(&mut receiver_proxy)
132 .push_element(fstacktrack_client::SnapshotElement::PageSize(FAKE_PAGE_SIZE))
133 .await
134 .unwrap()
135 .push_element(fstacktrack_client::SnapshotElement::ExecutableRegion(
136 fstacktrack_client::ExecutableRegion {
137 address: Some(FAKE_REGION_ADDRESS),
138 size: Some(FAKE_REGION_SIZE),
139 name: Some(FAKE_REGION_NAME.to_string()),
140 vaddr: Some(FAKE_REGION_VADDR),
141 build_id: Some(fstacktrack_client::BuildId {
142 value: FAKE_REGION_BUILD_ID.to_vec(),
143 }),
144 ..Default::default()
145 },
146 ))
147 .await
148 .unwrap();
149 for (koid, frames) in &fake_threads {
150 streamer = streamer
151 .push_element(fstacktrack_client::SnapshotElement::StackTrace(
152 fstacktrack_client::StackTrace {
153 thread_koid: Some(*koid),
154 frames: Some(
155 frames
156 .iter()
157 .map(|(pc, fp)| fstacktrack_client::CallFrame {
158 program_address: *pc,
159 frame_pointer: *fp,
160 })
161 .collect(),
162 ),
163 ..Default::default()
164 },
165 ))
166 .await
167 .unwrap();
168 }
169 streamer.end_of_snapshot().await.unwrap();
170
171 let received_snapshot = receive_worker.await.unwrap();
173 assert_eq!(received_snapshot.page_size, FAKE_PAGE_SIZE);
174
175 let mut received_stack_traces: HashMap<u64, &crate::snapshot::StackTrace> =
176 received_snapshot.stack_traces.iter().map(|trace| (trace.thread_koid, trace)).collect();
177 assert_eq!(received_snapshot.executable_regions.len(), 1);
178 let region = &received_snapshot.executable_regions[0];
179 assert_eq!(region.address, FAKE_REGION_ADDRESS);
180 assert_eq!(region.size, FAKE_REGION_SIZE);
181 assert_eq!(region.vaddr, FAKE_REGION_VADDR);
182 assert_eq!(region.name, FAKE_REGION_NAME);
183 assert_eq!(region.build_id, FAKE_REGION_BUILD_ID);
184 for (expected_koid, expected_frames) in &fake_threads {
185 let received_trace = received_stack_traces.remove(expected_koid).unwrap();
186
187 assert_eq!(received_trace.frames.len(), expected_frames.len());
188 for (received_frame, (expected_pc, expected_fp)) in
189 received_trace.frames.iter().zip(expected_frames)
190 {
191 assert_eq!(received_frame.program_address, *expected_pc);
192 assert_eq!(received_frame.frame_pointer, *expected_fp);
193 }
194 }
195 assert!(received_stack_traces.is_empty(), "all the entries have been removed");
196 }
197}