heapdump_snapshot/
streamer.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_fuchsia_memory_heapdump_client as fheapdump_client;
6use measure_tape_for_snapshot_element::Measurable;
7use zx_types::ZX_CHANNEL_MAX_MSG_BYTES;
8
9use crate::Error;
10
11// Number of bytes the header of a vector occupies in a fidl message.
12// TODO(https://fxbug.dev/42181010): This should be a constant in a FIDL library.
13const FIDL_VECTOR_HEADER_BYTES: usize = 16;
14
15// Number of bytes the header of a fidl message occupies.
16// TODO(https://fxbug.dev/42181010): This should be a constant in a FIDL library.
17const FIDL_HEADER_BYTES: usize = 16;
18
19// Size of the fixed part of a `SnapshotReceiver/Batch` FIDL message. The actual size is given by
20// this number plus the size of each element in the batch.
21const EMPTY_BUFFER_SIZE: usize = FIDL_HEADER_BYTES + FIDL_VECTOR_HEADER_BYTES;
22
23/// Implements pagination on top of a SnapshotReceiver channel.
24pub struct Streamer<'a> {
25    dest: &'a mut fheapdump_client::SnapshotReceiverProxy,
26    buffer: Vec<fheapdump_client::SnapshotElement>,
27    buffer_size: usize,
28}
29
30impl<'a> Streamer<'a> {
31    /// Prepares to send a snapshot over the given channel.
32    ///
33    /// Takes a mutable reference to be sure that nobody else can write into the channel at the
34    /// same time.
35    pub fn new(dest: &'a mut fheapdump_client::SnapshotReceiverProxy) -> Streamer<'a> {
36        Streamer { dest, buffer: Vec::new(), buffer_size: EMPTY_BUFFER_SIZE }
37    }
38
39    /// Sends the given `elem`.
40    ///
41    /// This method internally flushes the outgoing buffer, if necessary, so that it never exceeds
42    /// the maximum allowed size.
43    pub async fn push_element(
44        mut self,
45        elem: fheapdump_client::SnapshotElement,
46    ) -> Result<Self, Error> {
47        let elem_size = elem.measure().num_bytes;
48
49        // Flush the current buffer if the new element would not fit in it.
50        if self.buffer_size + elem_size > ZX_CHANNEL_MAX_MSG_BYTES as usize {
51            self.flush_buffer().await?;
52        }
53
54        // Append the new element.
55        self.buffer.push(elem);
56        self.buffer_size += elem_size;
57
58        Ok(self)
59    }
60
61    /// Sends the end-of-snapshot marker.
62    pub async fn end_of_snapshot(mut self) -> Result<(), Error> {
63        // Send the last elements in the queue.
64        if !self.buffer.is_empty() {
65            self.flush_buffer().await?;
66        }
67
68        // Send an empty batch to signal the end of the snapshot.
69        self.flush_buffer().await?;
70
71        Ok(())
72    }
73
74    async fn flush_buffer(&mut self) -> Result<(), Error> {
75        // Read and reset the buffer.
76        let buffer = std::mem::replace(&mut self.buffer, Vec::new());
77        self.buffer_size = EMPTY_BUFFER_SIZE;
78
79        // Send it.
80        let fut = self.dest.batch(&buffer);
81        Ok(fut.await?)
82    }
83}
84
85#[cfg(test)]
86mod tests {
87    use super::*;
88    use fidl::endpoints::create_proxy_and_stream;
89    use fuchsia_async as fasync;
90    use maplit::hashmap;
91    use std::collections::HashMap;
92    use std::rc::Rc;
93    use test_case::test_case;
94
95    use crate::ThreadInfo;
96    use crate::snapshot::Snapshot;
97
98    // Generate an allocation hash map with a huge number of entries, to test that pagination splits
99    // them properly.
100    fn generate_one_million_allocations_hashmap() -> HashMap<u64, u64> {
101        let mut result = HashMap::new();
102        let mut addr = 0;
103        for size in 0..1000000 {
104            result.insert(addr, size);
105            addr += size;
106        }
107        result
108    }
109
110    const FAKE_TIMESTAMP: fidl::MonotonicInstant = fidl::MonotonicInstant::from_nanos(12345678);
111    const FAKE_THREAD_KOID: u64 = 8989;
112    const FAKE_THREAD_NAME: &str = "fake-thread-name";
113    const FAKE_THREAD_KEY: u64 = 1212;
114    const FAKE_STACK_TRACE_ADDRESSES: [u64; 3] = [11111, 22222, 33333];
115    const FAKE_STACK_TRACE_KEY: u64 = 1234;
116    const FAKE_REGION_ADDRESS: u64 = 8192;
117    const FAKE_REGION_NAME: &str = "fake-region-name";
118    const FAKE_REGION_SIZE: u64 = 28672;
119    const FAKE_REGION_FILE_OFFSET: u64 = 4096;
120    const FAKE_REGION_VADDR: u64 = 12288;
121    const FAKE_REGION_BUILD_ID: &[u8] = &[0xee; 20];
122
123    #[test_case(hashmap! {} ; "empty")]
124    #[test_case(hashmap! { 1234 => 5678 } ; "only one")]
125    #[test_case(generate_one_million_allocations_hashmap() ; "one million")]
126    #[fasync::run_singlethreaded(test)]
127    async fn test_streamer(allocations: HashMap<u64, u64>) {
128        let (mut receiver_proxy, receiver_stream) =
129            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
130        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
131
132        // Transmit a snapshot with the given `allocations`, all referencing the same thread info
133        // and stack trace, with a single executable region.
134        let mut streamer = Streamer::new(&mut receiver_proxy)
135            .push_element(fheapdump_client::SnapshotElement::ThreadInfo(
136                fheapdump_client::ThreadInfo {
137                    thread_info_key: Some(FAKE_THREAD_KEY),
138                    koid: Some(FAKE_THREAD_KOID),
139                    name: Some(FAKE_THREAD_NAME.to_string()),
140                    ..Default::default()
141                },
142            ))
143            .await
144            .unwrap()
145            .push_element(fheapdump_client::SnapshotElement::StackTrace(
146                fheapdump_client::StackTrace {
147                    stack_trace_key: Some(FAKE_STACK_TRACE_KEY),
148                    program_addresses: Some(FAKE_STACK_TRACE_ADDRESSES.to_vec()),
149                    ..Default::default()
150                },
151            ))
152            .await
153            .unwrap()
154            .push_element(fheapdump_client::SnapshotElement::ExecutableRegion(
155                fheapdump_client::ExecutableRegion {
156                    address: Some(FAKE_REGION_ADDRESS),
157                    name: Some(FAKE_REGION_NAME.to_string()),
158                    size: Some(FAKE_REGION_SIZE),
159                    file_offset: Some(FAKE_REGION_FILE_OFFSET),
160                    vaddr: Some(FAKE_REGION_VADDR),
161                    build_id: Some(fheapdump_client::BuildId {
162                        value: FAKE_REGION_BUILD_ID.to_vec(),
163                    }),
164                    ..Default::default()
165                },
166            ))
167            .await
168            .unwrap();
169        for (address, size) in &allocations {
170            streamer = streamer
171                .push_element(fheapdump_client::SnapshotElement::Allocation(
172                    fheapdump_client::Allocation {
173                        address: Some(*address),
174                        size: Some(*size),
175                        thread_info_key: Some(FAKE_THREAD_KEY),
176                        stack_trace_key: Some(FAKE_STACK_TRACE_KEY),
177                        timestamp: Some(FAKE_TIMESTAMP),
178                        ..Default::default()
179                    },
180                ))
181                .await
182                .unwrap();
183        }
184        streamer.end_of_snapshot().await.unwrap();
185
186        // Receive the snapshot we just transmitted and verify that the allocations and the
187        // executable region we received match those that were sent.
188        let mut received_snapshot = receive_worker.await.unwrap();
189        let mut received_allocations: HashMap<u64, &crate::snapshot::Allocation> =
190            received_snapshot
191                .allocations
192                .iter()
193                .map(|alloc| (alloc.address.unwrap(), alloc))
194                .collect();
195        for (address, size) in &allocations {
196            let allocation = received_allocations.remove(address).unwrap();
197
198            assert_eq!(allocation.size, *size);
199            assert_eq!(
200                allocation.thread_info,
201                Some(Rc::new(ThreadInfo {
202                    koid: FAKE_THREAD_KOID,
203                    name: FAKE_THREAD_NAME.to_owned()
204                }))
205            );
206            assert_eq!(allocation.stack_trace.program_addresses, FAKE_STACK_TRACE_ADDRESSES);
207            assert_eq!(allocation.timestamp, Some(FAKE_TIMESTAMP));
208        }
209        assert!(received_allocations.is_empty(), "all the entries have been removed");
210        let region = received_snapshot.executable_regions.remove(&FAKE_REGION_ADDRESS).unwrap();
211        assert_eq!(region.name, FAKE_REGION_NAME);
212        assert_eq!(region.size, FAKE_REGION_SIZE);
213        assert_eq!(region.file_offset, FAKE_REGION_FILE_OFFSET);
214        assert_eq!(region.vaddr.unwrap(), FAKE_REGION_VADDR);
215        assert_eq!(region.build_id, FAKE_REGION_BUILD_ID);
216        assert!(received_snapshot.executable_regions.is_empty(), "all entries have been removed");
217    }
218}