heapdump_snapshot/
streamer.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_fuchsia_memory_heapdump_client as fheapdump_client;
6use measure_tape_for_snapshot_element::Measurable;
7use zx_types::ZX_CHANNEL_MAX_MSG_BYTES;
8
9use crate::Error;
10
11// Number of bytes the header of a vector occupies in a fidl message.
12// TODO(https://fxbug.dev/42181010): This should be a constant in a FIDL library.
13const FIDL_VECTOR_HEADER_BYTES: usize = 16;
14
15// Number of bytes the header of a fidl message occupies.
16// TODO(https://fxbug.dev/42181010): This should be a constant in a FIDL library.
17const FIDL_HEADER_BYTES: usize = 16;
18
19// Size of the fixed part of a `SnapshotReceiver/Batch` FIDL message. The actual size is given by
20// this number plus the size of each element in the batch.
21const EMPTY_BUFFER_SIZE: usize = FIDL_HEADER_BYTES + FIDL_VECTOR_HEADER_BYTES;
22
23/// Implements pagination on top of a SnapshotReceiver channel.
24pub struct Streamer {
25    dest: fheapdump_client::SnapshotReceiverProxy,
26    buffer: Vec<fheapdump_client::SnapshotElement>,
27    buffer_size: usize,
28}
29
30impl Streamer {
31    pub fn new(dest: fheapdump_client::SnapshotReceiverProxy) -> Streamer {
32        Streamer { dest, buffer: Vec::new(), buffer_size: EMPTY_BUFFER_SIZE }
33    }
34
35    /// Sends the given `elem`.
36    ///
37    /// This method internally flushes the outgoing buffer, if necessary, so that it never exceeds
38    /// the maximum allowed size.
39    pub async fn push_element(
40        mut self,
41        elem: fheapdump_client::SnapshotElement,
42    ) -> Result<Streamer, Error> {
43        let elem_size = elem.measure().num_bytes;
44
45        // Flush the current buffer if the new element would not fit in it.
46        if self.buffer_size + elem_size > ZX_CHANNEL_MAX_MSG_BYTES as usize {
47            self.flush_buffer().await?;
48        }
49
50        // Append the new element.
51        self.buffer.push(elem);
52        self.buffer_size += elem_size;
53
54        Ok(self)
55    }
56
57    /// Sends the end-of-stream marker.
58    pub async fn end_of_stream(mut self) -> Result<(), Error> {
59        // Send the last elements in the queue.
60        if !self.buffer.is_empty() {
61            self.flush_buffer().await?;
62        }
63
64        // Send an empty batch to signal the end of the stream.
65        self.flush_buffer().await?;
66
67        Ok(())
68    }
69
70    async fn flush_buffer(&mut self) -> Result<(), Error> {
71        // Read and reset the buffer.
72        let buffer = std::mem::replace(&mut self.buffer, Vec::new());
73        self.buffer_size = EMPTY_BUFFER_SIZE;
74
75        // Send it.
76        let fut = self.dest.batch(&buffer);
77        Ok(fut.await?)
78    }
79}
80
81#[cfg(test)]
82mod tests {
83    use super::*;
84    use fidl::endpoints::create_proxy_and_stream;
85    use fuchsia_async as fasync;
86    use maplit::hashmap;
87    use std::collections::HashMap;
88    use test_case::test_case;
89
90    use crate::snapshot::Snapshot;
91
92    // Generate an allocation hash map with a huge number of entries, to test that pagination splits
93    // them properly.
94    fn generate_one_million_allocations_hashmap() -> HashMap<u64, u64> {
95        let mut result = HashMap::new();
96        let mut addr = 0;
97        for size in 0..1000000 {
98            result.insert(addr, size);
99            addr += size;
100        }
101        result
102    }
103
104    const FAKE_TIMESTAMP: fidl::MonotonicInstant = fidl::MonotonicInstant::from_nanos(12345678);
105    const FAKE_THREAD_KOID: u64 = 8989;
106    const FAKE_THREAD_NAME: &str = "fake-thread-name";
107    const FAKE_THREAD_KEY: u64 = 1212;
108    const FAKE_STACK_TRACE_ADDRESSES: [u64; 3] = [11111, 22222, 33333];
109    const FAKE_STACK_TRACE_KEY: u64 = 1234;
110    const FAKE_REGION_ADDRESS: u64 = 8192;
111    const FAKE_REGION_SIZE: u64 = 28672;
112    const FAKE_REGION_FILE_OFFSET: u64 = 4096;
113    const FAKE_REGION_BUILD_ID: &[u8] = &[0xee; 20];
114
115    #[test_case(hashmap! {} ; "empty")]
116    #[test_case(hashmap! { 1234 => 5678 } ; "only one")]
117    #[test_case(generate_one_million_allocations_hashmap() ; "one million")]
118    #[fasync::run_singlethreaded(test)]
119    async fn test_streamer(allocations: HashMap<u64, u64>) {
120        let (receiver_proxy, receiver_stream) =
121            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
122        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
123
124        // Transmit a snapshot with the given `allocations`, all referencing the same thread info
125        // and stack trace, with a single executable region.
126        let mut streamer = Streamer::new(receiver_proxy)
127            .push_element(fheapdump_client::SnapshotElement::ThreadInfo(
128                fheapdump_client::ThreadInfo {
129                    thread_info_key: Some(FAKE_THREAD_KEY),
130                    koid: Some(FAKE_THREAD_KOID),
131                    name: Some(FAKE_THREAD_NAME.to_string()),
132                    ..Default::default()
133                },
134            ))
135            .await
136            .unwrap()
137            .push_element(fheapdump_client::SnapshotElement::StackTrace(
138                fheapdump_client::StackTrace {
139                    stack_trace_key: Some(FAKE_STACK_TRACE_KEY),
140                    program_addresses: Some(FAKE_STACK_TRACE_ADDRESSES.to_vec()),
141                    ..Default::default()
142                },
143            ))
144            .await
145            .unwrap()
146            .push_element(fheapdump_client::SnapshotElement::ExecutableRegion(
147                fheapdump_client::ExecutableRegion {
148                    address: Some(FAKE_REGION_ADDRESS),
149                    size: Some(FAKE_REGION_SIZE),
150                    file_offset: Some(FAKE_REGION_FILE_OFFSET),
151                    build_id: Some(fheapdump_client::BuildId {
152                        value: FAKE_REGION_BUILD_ID.to_vec(),
153                    }),
154                    ..Default::default()
155                },
156            ))
157            .await
158            .unwrap();
159        for (address, size) in &allocations {
160            streamer = streamer
161                .push_element(fheapdump_client::SnapshotElement::Allocation(
162                    fheapdump_client::Allocation {
163                        address: Some(*address),
164                        size: Some(*size),
165                        thread_info_key: Some(FAKE_THREAD_KEY),
166                        stack_trace_key: Some(FAKE_STACK_TRACE_KEY),
167                        timestamp: Some(FAKE_TIMESTAMP),
168                        ..Default::default()
169                    },
170                ))
171                .await
172                .unwrap();
173        }
174        streamer.end_of_stream().await.unwrap();
175
176        // Receive the snapshot we just transmitted and verify that the allocations and the
177        // executable region we received match those that were sent.
178        let mut received_snapshot = receive_worker.await.unwrap();
179        for (address, size) in &allocations {
180            let allocation = received_snapshot.allocations.remove(address).unwrap();
181            assert_eq!(allocation.size, *size);
182            assert_eq!(allocation.thread_info.koid, FAKE_THREAD_KOID);
183            assert_eq!(allocation.thread_info.name, FAKE_THREAD_NAME);
184            assert_eq!(allocation.stack_trace.program_addresses, FAKE_STACK_TRACE_ADDRESSES);
185            assert_eq!(allocation.timestamp, FAKE_TIMESTAMP);
186        }
187        assert!(received_snapshot.allocations.is_empty(), "all the entries have been removed");
188        let region = received_snapshot.executable_regions.remove(&FAKE_REGION_ADDRESS).unwrap();
189        assert_eq!(region.size, FAKE_REGION_SIZE);
190        assert_eq!(region.file_offset, FAKE_REGION_FILE_OFFSET);
191        assert_eq!(region.build_id, FAKE_REGION_BUILD_ID);
192        assert!(received_snapshot.executable_regions.is_empty(), "all entries have been removed");
193    }
194}