1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
45use fidl_fuchsia_memory_heapdump_client as fheapdump_client;
6use measure_tape_for_snapshot_element::Measurable;
7use zx_types::ZX_CHANNEL_MAX_MSG_BYTES;
89use crate::Error;
1011// Number of bytes the header of a vector occupies in a fidl message.
12// TODO(https://fxbug.dev/42181010): This should be a constant in a FIDL library.
13const FIDL_VECTOR_HEADER_BYTES: usize = 16;
1415// Number of bytes the header of a fidl message occupies.
16// TODO(https://fxbug.dev/42181010): This should be a constant in a FIDL library.
17const FIDL_HEADER_BYTES: usize = 16;
1819// Size of the fixed part of a `SnapshotReceiver/Batch` FIDL message. The actual size is given by
20// this number plus the size of each element in the batch.
21const EMPTY_BUFFER_SIZE: usize = FIDL_HEADER_BYTES + FIDL_VECTOR_HEADER_BYTES;
2223/// Implements pagination on top of a SnapshotReceiver channel.
24pub struct Streamer {
25 dest: fheapdump_client::SnapshotReceiverProxy,
26 buffer: Vec<fheapdump_client::SnapshotElement>,
27 buffer_size: usize,
28}
2930impl Streamer {
31pub fn new(dest: fheapdump_client::SnapshotReceiverProxy) -> Streamer {
32 Streamer { dest, buffer: Vec::new(), buffer_size: EMPTY_BUFFER_SIZE }
33 }
3435/// Sends the given `elem`.
36 ///
37 /// This method internally flushes the outgoing buffer, if necessary, so that it never exceeds
38 /// the maximum allowed size.
39pub async fn push_element(
40mut self,
41 elem: fheapdump_client::SnapshotElement,
42 ) -> Result<Streamer, Error> {
43let elem_size = elem.measure().num_bytes;
4445// Flush the current buffer if the new element would not fit in it.
46if self.buffer_size + elem_size > ZX_CHANNEL_MAX_MSG_BYTES as usize {
47self.flush_buffer().await?;
48 }
4950// Append the new element.
51self.buffer.push(elem);
52self.buffer_size += elem_size;
5354Ok(self)
55 }
5657/// Sends the end-of-stream marker.
58pub async fn end_of_stream(mut self) -> Result<(), Error> {
59// Send the last elements in the queue.
60if !self.buffer.is_empty() {
61self.flush_buffer().await?;
62 }
6364// Send an empty batch to signal the end of the stream.
65self.flush_buffer().await?;
6667Ok(())
68 }
6970async fn flush_buffer(&mut self) -> Result<(), Error> {
71// Read and reset the buffer.
72let buffer = std::mem::replace(&mut self.buffer, Vec::new());
73self.buffer_size = EMPTY_BUFFER_SIZE;
7475// Send it.
76let fut = self.dest.batch(&buffer);
77Ok(fut.await?)
78 }
79}
8081#[cfg(test)]
82mod tests {
83use super::*;
84use fidl::endpoints::create_proxy_and_stream;
85use fuchsia_async as fasync;
86use maplit::hashmap;
87use std::collections::HashMap;
88use test_case::test_case;
8990use crate::snapshot::Snapshot;
9192// Generate an allocation hash map with a huge number of entries, to test that pagination splits
93 // them properly.
94fn generate_one_million_allocations_hashmap() -> HashMap<u64, u64> {
95let mut result = HashMap::new();
96let mut addr = 0;
97for size in 0..1000000 {
98 result.insert(addr, size);
99 addr += size;
100 }
101 result
102 }
103104const FAKE_TIMESTAMP: fidl::MonotonicInstant = fidl::MonotonicInstant::from_nanos(12345678);
105const FAKE_THREAD_KOID: u64 = 8989;
106const FAKE_THREAD_NAME: &str = "fake-thread-name";
107const FAKE_THREAD_KEY: u64 = 1212;
108const FAKE_STACK_TRACE_ADDRESSES: [u64; 3] = [11111, 22222, 33333];
109const FAKE_STACK_TRACE_KEY: u64 = 1234;
110const FAKE_REGION_ADDRESS: u64 = 8192;
111const FAKE_REGION_SIZE: u64 = 28672;
112const FAKE_REGION_FILE_OFFSET: u64 = 4096;
113const FAKE_REGION_BUILD_ID: &[u8] = &[0xee; 20];
114115#[test_case(hashmap! {} ; "empty")]
116 #[test_case(hashmap! { 1234 => 5678 } ; "only one")]
117 #[test_case(generate_one_million_allocations_hashmap() ; "one million")]
118 #[fasync::run_singlethreaded(test)]
119async fn test_streamer(allocations: HashMap<u64, u64>) {
120let (receiver_proxy, receiver_stream) =
121 create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
122let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
123124// Transmit a snapshot with the given `allocations`, all referencing the same thread info
125 // and stack trace, with a single executable region.
126let mut streamer = Streamer::new(receiver_proxy)
127 .push_element(fheapdump_client::SnapshotElement::ThreadInfo(
128 fheapdump_client::ThreadInfo {
129 thread_info_key: Some(FAKE_THREAD_KEY),
130 koid: Some(FAKE_THREAD_KOID),
131 name: Some(FAKE_THREAD_NAME.to_string()),
132 ..Default::default()
133 },
134 ))
135 .await
136.unwrap()
137 .push_element(fheapdump_client::SnapshotElement::StackTrace(
138 fheapdump_client::StackTrace {
139 stack_trace_key: Some(FAKE_STACK_TRACE_KEY),
140 program_addresses: Some(FAKE_STACK_TRACE_ADDRESSES.to_vec()),
141 ..Default::default()
142 },
143 ))
144 .await
145.unwrap()
146 .push_element(fheapdump_client::SnapshotElement::ExecutableRegion(
147 fheapdump_client::ExecutableRegion {
148 address: Some(FAKE_REGION_ADDRESS),
149 size: Some(FAKE_REGION_SIZE),
150 file_offset: Some(FAKE_REGION_FILE_OFFSET),
151 build_id: Some(fheapdump_client::BuildId {
152 value: FAKE_REGION_BUILD_ID.to_vec(),
153 }),
154 ..Default::default()
155 },
156 ))
157 .await
158.unwrap();
159for (address, size) in &allocations {
160 streamer = streamer
161 .push_element(fheapdump_client::SnapshotElement::Allocation(
162 fheapdump_client::Allocation {
163 address: Some(*address),
164 size: Some(*size),
165 thread_info_key: Some(FAKE_THREAD_KEY),
166 stack_trace_key: Some(FAKE_STACK_TRACE_KEY),
167 timestamp: Some(FAKE_TIMESTAMP),
168 ..Default::default()
169 },
170 ))
171 .await
172.unwrap();
173 }
174 streamer.end_of_stream().await.unwrap();
175176// Receive the snapshot we just transmitted and verify that the allocations and the
177 // executable region we received match those that were sent.
178let mut received_snapshot = receive_worker.await.unwrap();
179for (address, size) in &allocations {
180let allocation = received_snapshot.allocations.remove(address).unwrap();
181assert_eq!(allocation.size, *size);
182assert_eq!(allocation.thread_info.koid, FAKE_THREAD_KOID);
183assert_eq!(allocation.thread_info.name, FAKE_THREAD_NAME);
184assert_eq!(allocation.stack_trace.program_addresses, FAKE_STACK_TRACE_ADDRESSES);
185assert_eq!(allocation.timestamp, FAKE_TIMESTAMP);
186 }
187assert!(received_snapshot.allocations.is_empty(), "all the entries have been removed");
188let region = received_snapshot.executable_regions.remove(&FAKE_REGION_ADDRESS).unwrap();
189assert_eq!(region.size, FAKE_REGION_SIZE);
190assert_eq!(region.file_offset, FAKE_REGION_FILE_OFFSET);
191assert_eq!(region.build_id, FAKE_REGION_BUILD_ID);
192assert!(received_snapshot.executable_regions.is_empty(), "all entries have been removed");
193 }
194}