Skip to main content

stacktrack_snapshot/
streamer.rs

1// Copyright 2026 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use flex_fuchsia_memory_stacktrack_client as fstacktrack_client;
6use measure_tape_for_snapshot_element::Measurable;
7use zx_types::ZX_CHANNEL_MAX_MSG_BYTES;
8
9use crate::Error;
10
11// Number of bytes the header of a vector occupies in a fidl message.
12// TODO(https://fxbug.dev/42181010): This should be a constant in a FIDL library.
13const FIDL_VECTOR_HEADER_BYTES: usize = 16;
14
15// Number of bytes the header of a fidl message occupies.
16// TODO(https://fxbug.dev/42181010): This should be a constant in a FIDL library.
17const FIDL_HEADER_BYTES: usize = 16;
18
19// Size of the fixed part of a `SnapshotReceiver/Batch` FIDL message. The actual size is given by
20// this number plus the size of each element in the batch.
21const EMPTY_BUFFER_SIZE: usize = FIDL_HEADER_BYTES + FIDL_VECTOR_HEADER_BYTES;
22
23/// Implements pagination on top of a SnapshotReceiver channel.
24pub struct Streamer<'a> {
25    dest: &'a mut fstacktrack_client::SnapshotReceiverProxy,
26    buffer: Vec<fstacktrack_client::SnapshotElement>,
27    buffer_size: usize,
28}
29
30impl<'a> Streamer<'a> {
31    /// Prepares to send a snapshot over the given channel.
32    ///
33    /// Takes a mutable reference to be sure that nobody else can write into the channel at the
34    /// same time.
35    pub fn new(dest: &'a mut fstacktrack_client::SnapshotReceiverProxy) -> Streamer<'a> {
36        Streamer { dest, buffer: Vec::new(), buffer_size: EMPTY_BUFFER_SIZE }
37    }
38
39    /// Sends the given `elem`.
40    ///
41    /// This method internally flushes the outgoing buffer, if necessary, so that it never exceeds
42    /// the maximum allowed size.
43    pub async fn push_element(
44        mut self,
45        elem: fstacktrack_client::SnapshotElement,
46    ) -> Result<Self, Error> {
47        let elem_size = elem.measure().num_bytes;
48
49        // Flush the current buffer if the new element would not fit in it.
50        if self.buffer_size + elem_size > ZX_CHANNEL_MAX_MSG_BYTES as usize {
51            self.flush_buffer().await?;
52        }
53
54        // Append the new element.
55        self.buffer.push(elem);
56        self.buffer_size += elem_size;
57
58        Ok(self)
59    }
60
61    /// Sends the end-of-snapshot marker.
62    pub async fn end_of_snapshot(mut self) -> Result<(), Error> {
63        // Send the last elements in the queue.
64        if !self.buffer.is_empty() {
65            self.flush_buffer().await?;
66        }
67
68        // Send an empty batch to signal the end of the snapshot.
69        self.flush_buffer().await?;
70
71        Ok(())
72    }
73
74    async fn flush_buffer(&mut self) -> Result<(), Error> {
75        // Read and reset the buffer.
76        let buffer = std::mem::replace(&mut self.buffer, Vec::new());
77        self.buffer_size = EMPTY_BUFFER_SIZE;
78
79        // Send it.
80        let fut = self.dest.batch(&buffer);
81        Ok(fut.await?)
82    }
83}
84
85#[cfg(test)]
86mod tests {
87    use super::*;
88    use crate::test_helpers::create_client;
89    use fuchsia_async as fasync;
90    use std::collections::HashMap;
91    use test_case::test_case;
92
93    use crate::snapshot::Snapshot;
94
95    const FAKE_PAGE_SIZE: u64 = 4096;
96    const FAKE_REGION_ADDRESS: u64 = 0x10000;
97    const FAKE_REGION_SIZE: u64 = 0x2000;
98    const FAKE_REGION_NAME: &str = "libexample.so";
99    const FAKE_REGION_VADDR: u64 = 0x5000;
100    const FAKE_REGION_BUILD_ID: &[u8] = &[0xAA, 0xBB, 0xCC, 0xDD];
101
102    fn generate_fake_threads(n: usize) -> HashMap<u64, Vec<(u64, u64)>> {
103        let mut result = HashMap::new();
104        for i in 0..n as u64 {
105            let koid = 1000 + i;
106
107            // Generate a stack trace with a pseudo-random number of fake frames.
108            let mut frames = Vec::new();
109            for j in 0..(i % 321) {
110                frames.push(((i + j) ^ 0x1111, (i + j) ^ 0x2222));
111            }
112
113            result.insert(koid, frames);
114        }
115        result
116    }
117
118    #[test_case(0)]
119    #[test_case(1)]
120    #[test_case(100000)]
121    #[fasync::run_singlethreaded(test)]
122    async fn test_streamer(num_threads: usize) {
123        let fake_threads = generate_fake_threads(num_threads);
124
125        let client = create_client();
126        let (mut receiver_proxy, receiver_stream) =
127            client.create_proxy_and_stream::<fstacktrack_client::SnapshotReceiverMarker>();
128        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
129
130        // Transmit a snapshot with a page size, stack traces and an executable region.
131        let mut streamer = Streamer::new(&mut receiver_proxy)
132            .push_element(fstacktrack_client::SnapshotElement::PageSize(FAKE_PAGE_SIZE))
133            .await
134            .unwrap()
135            .push_element(fstacktrack_client::SnapshotElement::ExecutableRegion(
136                fstacktrack_client::ExecutableRegion {
137                    address: Some(FAKE_REGION_ADDRESS),
138                    size: Some(FAKE_REGION_SIZE),
139                    name: Some(FAKE_REGION_NAME.to_string()),
140                    vaddr: Some(FAKE_REGION_VADDR),
141                    build_id: Some(fstacktrack_client::BuildId {
142                        value: FAKE_REGION_BUILD_ID.to_vec(),
143                    }),
144                    ..Default::default()
145                },
146            ))
147            .await
148            .unwrap();
149        for (koid, frames) in &fake_threads {
150            streamer = streamer
151                .push_element(fstacktrack_client::SnapshotElement::StackTrace(
152                    fstacktrack_client::StackTrace {
153                        thread_koid: Some(*koid),
154                        frames: Some(
155                            frames
156                                .iter()
157                                .map(|(pc, fp)| fstacktrack_client::CallFrame {
158                                    program_address: *pc,
159                                    frame_pointer: *fp,
160                                })
161                                .collect(),
162                        ),
163                        ..Default::default()
164                    },
165                ))
166                .await
167                .unwrap();
168        }
169        streamer.end_of_snapshot().await.unwrap();
170
171        // Receive the snapshot we just transmitted and verify its contents.
172        let received_snapshot = receive_worker.await.unwrap();
173        assert_eq!(received_snapshot.page_size, FAKE_PAGE_SIZE);
174
175        let mut received_stack_traces: HashMap<u64, &crate::snapshot::StackTrace> =
176            received_snapshot.stack_traces.iter().map(|trace| (trace.thread_koid, trace)).collect();
177        assert_eq!(received_snapshot.executable_regions.len(), 1);
178        let region = &received_snapshot.executable_regions[0];
179        assert_eq!(region.address, FAKE_REGION_ADDRESS);
180        assert_eq!(region.size, FAKE_REGION_SIZE);
181        assert_eq!(region.vaddr, FAKE_REGION_VADDR);
182        assert_eq!(region.name, FAKE_REGION_NAME);
183        assert_eq!(region.build_id, FAKE_REGION_BUILD_ID);
184        for (expected_koid, expected_frames) in &fake_threads {
185            let received_trace = received_stack_traces.remove(expected_koid).unwrap();
186
187            assert_eq!(received_trace.frames.len(), expected_frames.len());
188            for (received_frame, (expected_pc, expected_fp)) in
189                received_trace.frames.iter().zip(expected_frames)
190            {
191                assert_eq!(received_frame.program_address, *expected_pc);
192                assert_eq!(received_frame.frame_pointer, *expected_fp);
193            }
194        }
195        assert!(received_stack_traces.is_empty(), "all the entries have been removed");
196    }
197}