Skip to main content

heapdump_snapshot/
snapshot.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl::MonotonicInstant;
6use flex_fuchsia_memory_heapdump_client as fheapdump_client;
7use futures::StreamExt;
8use std::collections::{HashMap, HashSet};
9use std::rc::Rc;
10
11use crate::Error;
12
13/// Contains a snapshot along with metadata from its header.
14#[derive(Debug)]
15pub struct SnapshotWithHeader {
16    pub process_name: String,
17    pub process_koid: u64,
18    pub snapshot: Snapshot,
19}
20
21/// Contains all the data received over a `SnapshotReceiver` channel.
22#[derive(Debug)]
23pub struct Snapshot {
24    /// All the live allocations in the analyzed process, indexed by memory address.
25    pub allocations: Vec<Allocation>,
26
27    /// All the executable memory regions in the analyzed process, indexed by start address.
28    pub executable_regions: HashMap<u64, ExecutableRegion>,
29}
30
31/// Information about one or more allocated memory blocks.
32#[derive(Debug)]
33pub struct Allocation {
34    pub address: Option<u64>,
35
36    /// Number of allocations that have been aggregated into this `Allocation` instance.
37    pub count: u64,
38
39    /// Total size, in bytes.
40    pub size: u64,
41
42    /// Allocating thread.
43    pub thread_info: Option<Rc<ThreadInfo>>,
44
45    /// Stack trace of the allocation site.
46    pub stack_trace: Rc<StackTrace>,
47
48    /// Allocation timestamp, in nanoseconds.
49    pub timestamp: Option<MonotonicInstant>,
50
51    /// Memory dump of this block's contents.
52    pub contents: Option<Vec<u8>>,
53}
54
55/// A stack trace.
56#[derive(Debug)]
57pub struct StackTrace {
58    /// Code addresses at each call frame. The first entry corresponds to the leaf call.
59    pub program_addresses: Vec<u64>,
60}
61
62/// A memory region containing code loaded from an ELF file.
63#[derive(Debug)]
64pub struct ExecutableRegion {
65    /// Region name for human consumption (usually either the ELF soname or the VMO name), if known.
66    pub name: String,
67
68    /// Region size, in bytes.
69    pub size: u64,
70
71    /// The corresponding offset in the ELF file.
72    pub file_offset: u64,
73
74    /// The corresponding relative address in the ELF file.
75    ///
76    /// Note: this field is not populated if the snapshot was generated by a build from before the
77    /// corresponding FIDL field was introduced.
78    pub vaddr: Option<u64>,
79
80    /// The Build ID of the ELF file.
81    pub build_id: Vec<u8>,
82}
83
84/// Information identifying a specific thread.
85#[derive(Debug, PartialEq)]
86pub struct ThreadInfo {
87    /// The thread's koid.
88    pub koid: zx_types::zx_koid_t,
89
90    /// The thread's name.
91    pub name: String,
92}
93
94/// Gets the value of a field in a FIDL table as a `Result<T, Error>`.
95///
96/// An `Err(Error::MissingField { .. })` is returned if the field's value is `None`.
97///
98/// Usage: `read_field!(container_expression => ContainerType, field_name)`
99///
100/// # Example
101///
102/// ```
103/// struct MyFidlTable { field: Option<u32>, .. }
104/// let table = MyFidlTable { field: Some(44), .. };
105///
106/// let val = read_field!(table => MyFidlTable, field)?;
107/// ```
108macro_rules! read_field {
109    ($e:expr => $c:ident, $f:ident) => {
110        $e.$f.ok_or(Error::MissingField {
111            container: std::stringify!($c),
112            field: std::stringify!($f),
113        })
114    };
115}
116
117impl Snapshot {
118    /// Receives a snapshot over a `SnapshotReceiver` channel and reassembles it.
119    pub async fn receive_single_from(
120        mut stream: fheapdump_client::SnapshotReceiverRequestStream,
121    ) -> Result<Snapshot, Error> {
122        Snapshot::receive_inner(&mut stream).await
123    }
124
125    /// Receives multiple header-prefixed snapshots over a `SnapshotReceiver` channel and
126    /// reassemble them.
127    #[cfg(fuchsia_api_level_at_least = "HEAD")]
128    pub async fn receive_multi_from(
129        mut stream: fheapdump_client::SnapshotReceiverRequestStream,
130    ) -> Result<Vec<SnapshotWithHeader>, Error> {
131        let mut snapshots = vec![];
132        loop {
133            // Wait for a batch of elements containing either just a header element or an empty
134            // batch (to signal the end of the stream).
135            match stream.next().await.transpose()? {
136                Some(fheapdump_client::SnapshotReceiverRequest::Batch { batch, responder }) => {
137                    match &batch[..] {
138                        [fheapdump_client::SnapshotElement::SnapshotHeader(header)] => {
139                            responder.send()?;
140
141                            // Receive the actual snapshot.
142                            let snapshot = Snapshot::receive_inner(&mut stream).await?;
143
144                            let header = header.clone();
145                            snapshots.push(SnapshotWithHeader {
146                                process_name: read_field!(header => SnapshotHeader, process_name)?,
147                                process_koid: read_field!(header => SnapshotHeader, process_koid)?,
148                                snapshot,
149                            });
150                        }
151                        [] => {
152                            responder.send()?;
153                            return Ok(snapshots);
154                        }
155                        _ => return Err(Error::HeaderExpected),
156                    }
157                }
158                Some(fheapdump_client::SnapshotReceiverRequest::ReportError {
159                    error,
160                    responder,
161                }) => {
162                    let _ = responder.send(); // Ignore the result of the acknowledgment.
163                    return Err(Error::CollectorError(error));
164                }
165                None => return Err(Error::UnexpectedEndOfStream),
166            };
167        }
168    }
169
170    async fn receive_inner(
171        stream: &mut fheapdump_client::SnapshotReceiverRequestStream,
172    ) -> Result<Snapshot, Error> {
173        struct AllocationValue {
174            address: Option<u64>,
175            count: u64,
176            size: u64,
177            thread_info_key: Option<u64>,
178            stack_trace_key: u64,
179            timestamp: Option<MonotonicInstant>,
180        }
181        let mut allocation_addresses: HashSet<u64> = HashSet::new();
182        let mut allocations: Vec<AllocationValue> = vec![];
183        let mut thread_infos: HashMap<u64, Rc<ThreadInfo>> = HashMap::new();
184        let mut stack_traces: HashMap<u64, Vec<u64>> = HashMap::new();
185        let mut executable_regions: HashMap<u64, ExecutableRegion> = HashMap::new();
186        let mut contents: HashMap<u64, Vec<u8>> = HashMap::new();
187
188        loop {
189            // Wait for the next batch of elements.
190            let batch = match stream.next().await.transpose()? {
191                Some(fheapdump_client::SnapshotReceiverRequest::Batch { batch, responder }) => {
192                    // Send acknowledgment as quickly as possible, then keep processing the received batch.
193                    responder.send()?;
194                    batch
195                }
196                Some(fheapdump_client::SnapshotReceiverRequest::ReportError {
197                    error,
198                    responder,
199                }) => {
200                    let _ = responder.send(); // Ignore the result of the acknowledgment.
201                    return Err(Error::CollectorError(error));
202                }
203                None => return Err(Error::UnexpectedEndOfStream),
204            };
205
206            // Process data. An empty batch signals the end of the stream.
207            if !batch.is_empty() {
208                for element in batch {
209                    match element {
210                        fheapdump_client::SnapshotElement::Allocation(allocation) => {
211                            if let Some(address) = allocation.address {
212                                if !allocation_addresses.insert(address) {
213                                    return Err(Error::ConflictingElement {
214                                        element_type: "Allocation",
215                                    });
216                                }
217                            }
218
219                            #[cfg(not(fuchsia_api_level_at_least = "29"))]
220                            let count = 1;
221                            #[cfg(fuchsia_api_level_at_least = "29")]
222                            let count = allocation.count.unwrap_or(1);
223
224                            let size = read_field!(allocation => Allocation, size)?;
225                            let stack_trace_key =
226                                read_field!(allocation => Allocation, stack_trace_key)?;
227                            allocations.push(AllocationValue {
228                                address: allocation.address,
229                                count,
230                                size,
231                                thread_info_key: allocation.thread_info_key,
232                                stack_trace_key,
233                                timestamp: allocation.timestamp,
234                            });
235                        }
236                        fheapdump_client::SnapshotElement::StackTrace(stack_trace) => {
237                            let stack_trace_key =
238                                read_field!(stack_trace => StackTrace, stack_trace_key)?;
239                            let mut program_addresses =
240                                read_field!(stack_trace => StackTrace, program_addresses)?;
241                            stack_traces
242                                .entry(stack_trace_key)
243                                .or_default()
244                                .append(&mut program_addresses);
245                        }
246                        fheapdump_client::SnapshotElement::ThreadInfo(thread_info) => {
247                            let thread_info_key =
248                                read_field!(thread_info => ThreadInfo, thread_info_key)?;
249                            let koid = read_field!(thread_info => ThreadInfo, koid)?;
250                            let name = read_field!(thread_info => ThreadInfo, name)?;
251                            if thread_infos
252                                .insert(thread_info_key, Rc::new(ThreadInfo { koid, name }))
253                                .is_some()
254                            {
255                                return Err(Error::ConflictingElement {
256                                    element_type: "ThreadInfo",
257                                });
258                            }
259                        }
260                        fheapdump_client::SnapshotElement::ExecutableRegion(region) => {
261                            let address = read_field!(region => ExecutableRegion, address)?;
262                            #[cfg(fuchsia_api_level_at_least = "27")]
263                            let name = region.name.unwrap_or_else(|| String::new());
264                            #[cfg(not(fuchsia_api_level_at_least = "27"))]
265                            let name = String::new();
266                            let size = read_field!(region => ExecutableRegion, size)?;
267                            let file_offset = read_field!(region => ExecutableRegion, file_offset)?;
268                            #[cfg(fuchsia_api_level_at_least = "27")]
269                            let vaddr = region.vaddr;
270                            #[cfg(not(fuchsia_api_level_at_least = "27"))]
271                            let vaddr = None;
272                            let build_id = read_field!(region => ExecutableRegion, build_id)?.value;
273                            let region =
274                                ExecutableRegion { name, size, file_offset, vaddr, build_id };
275                            if executable_regions.insert(address, region).is_some() {
276                                return Err(Error::ConflictingElement {
277                                    element_type: "ExecutableRegion",
278                                });
279                            }
280                        }
281                        fheapdump_client::SnapshotElement::BlockContents(block_contents) => {
282                            let address = read_field!(block_contents => BlockContents, address)?;
283                            let mut chunk = read_field!(block_contents => BlockContents, contents)?;
284                            contents.entry(address).or_default().append(&mut chunk);
285                        }
286                        _ => return Err(Error::UnexpectedElementType),
287                    }
288                }
289            } else {
290                // We are at the end of the stream. Convert to the final types and resolve
291                // cross-references.
292                let final_stack_traces: HashMap<u64, Rc<StackTrace>> = stack_traces
293                    .into_iter()
294                    .map(|(key, program_addresses)| {
295                        (key, Rc::new(StackTrace { program_addresses }))
296                    })
297                    .collect();
298                let mut final_allocations = vec![];
299                for AllocationValue {
300                    address,
301                    count,
302                    size,
303                    thread_info_key,
304                    stack_trace_key,
305                    timestamp,
306                } in allocations
307                {
308                    let thread_info = match thread_info_key {
309                        Some(key) => Some(
310                            thread_infos
311                                .get(&key)
312                                .ok_or(Error::InvalidCrossReference { element_type: "ThreadInfo" })?
313                                .clone(),
314                        ),
315                        None => None,
316                    };
317                    let stack_trace = final_stack_traces
318                        .get(&stack_trace_key)
319                        .ok_or(Error::InvalidCrossReference { element_type: "StackTrace" })?
320                        .clone();
321                    let contents = address.and_then(|address| contents.remove(&address));
322                    if let Some(data) = &contents {
323                        if data.len() as u64 != size {
324                            return Err(Error::ConflictingElement {
325                                element_type: "BlockContents",
326                            });
327                        }
328                    }
329                    final_allocations.push(Allocation {
330                        address,
331                        count,
332                        size,
333                        thread_info,
334                        stack_trace,
335                        timestamp,
336                        contents,
337                    });
338                }
339
340                return Ok(Snapshot { allocations: final_allocations, executable_regions });
341            }
342        }
343    }
344}
345
346#[cfg(test)]
347mod tests {
348    use super::*;
349    use crate::test_helpers::create_client;
350    use assert_matches::assert_matches;
351    use fuchsia_async as fasync;
352    use test_case::test_case;
353
354    // Constants used by some of the tests below:
355    const FAKE_ALLOCATION_1_ADDRESS: u64 = 1234;
356    const FAKE_ALLOCATION_1_SIZE: u64 = 8;
357    const FAKE_ALLOCATION_1_TIMESTAMP: MonotonicInstant = MonotonicInstant::from_nanos(888888888);
358    const FAKE_ALLOCATION_1_CONTENTS: [u8; FAKE_ALLOCATION_1_SIZE as usize] = *b"12345678";
359    const FAKE_ALLOCATION_2_ADDRESS: u64 = 5678;
360    const FAKE_ALLOCATION_2_SIZE: u64 = 4;
361    const FAKE_ALLOCATION_2_TIMESTAMP: MonotonicInstant = MonotonicInstant::from_nanos(-777777777); // test negative value too
362    const FAKE_THREAD_1_KOID: u64 = 1212;
363    const FAKE_THREAD_1_NAME: &str = "fake-thread-1-name";
364    const FAKE_THREAD_1_KEY: u64 = 4567;
365    const FAKE_THREAD_2_KOID: u64 = 1213;
366    const FAKE_THREAD_2_NAME: &str = "fake-thread-2-name";
367    const FAKE_THREAD_2_KEY: u64 = 7654;
368    const FAKE_STACK_TRACE_1_ADDRESSES: [u64; 6] = [11111, 22222, 33333, 22222, 44444, 55555];
369    const FAKE_STACK_TRACE_1_KEY: u64 = 9876;
370    const FAKE_STACK_TRACE_2_ADDRESSES: [u64; 4] = [11111, 22222, 11111, 66666];
371    const FAKE_STACK_TRACE_2_KEY: u64 = 6789;
372    const FAKE_REGION_1_ADDRESS: u64 = 0x10000000;
373    const FAKE_REGION_1_NAME: &str = "region-1";
374    const FAKE_REGION_1_SIZE: u64 = 0x80000;
375    const FAKE_REGION_1_FILE_OFFSET: u64 = 0x1000;
376    const FAKE_REGION_1_VADDR: u64 = 0x3000;
377    const FAKE_REGION_1_BUILD_ID: &[u8] = &[0xaa; 20];
378    const FAKE_REGION_2_ADDRESS: u64 = 0x7654300000;
379    const FAKE_REGION_2_SIZE: u64 = 0x200000;
380    const FAKE_REGION_2_FILE_OFFSET: u64 = 0x2000;
381    const FAKE_REGION_2_BUILD_ID: &[u8] = &[0x55; 32];
382
383    #[fasync::run_singlethreaded(test)]
384    async fn test_empty() {
385        let client = create_client();
386        let (receiver_proxy, receiver_stream) =
387            client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
388        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
389
390        // Send the end of stream marker.
391        let fut = receiver_proxy.batch(&[]);
392        fut.await.unwrap();
393
394        // Receive the snapshot we just transmitted and verify that it is empty.
395        let received_snapshot = receive_worker.await.unwrap();
396        assert!(received_snapshot.allocations.is_empty());
397        assert!(received_snapshot.executable_regions.is_empty());
398    }
399
400    #[fasync::run_singlethreaded(test)]
401    async fn test_one_batch() {
402        let client = create_client();
403        let (receiver_proxy, receiver_stream) =
404            client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
405        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
406
407        // Send a batch containing two allocations - whose threads, stack traces and contents can be
408        // listed before or after the allocation(s) that reference them - and two executable
409        // regions.
410        let fut = receiver_proxy.batch(&[
411            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
412                address: Some(FAKE_ALLOCATION_1_ADDRESS),
413                contents: Some(FAKE_ALLOCATION_1_CONTENTS.to_vec()),
414                ..Default::default()
415            }),
416            fheapdump_client::SnapshotElement::ExecutableRegion(
417                fheapdump_client::ExecutableRegion {
418                    address: Some(FAKE_REGION_1_ADDRESS),
419                    name: Some(FAKE_REGION_1_NAME.to_string()),
420                    size: Some(FAKE_REGION_1_SIZE),
421                    file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
422                    vaddr: Some(FAKE_REGION_1_VADDR),
423                    build_id: Some(fheapdump_client::BuildId {
424                        value: FAKE_REGION_1_BUILD_ID.to_vec(),
425                    }),
426                    ..Default::default()
427                },
428            ),
429            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
430                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
431                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
432                ..Default::default()
433            }),
434            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
435                address: Some(FAKE_ALLOCATION_1_ADDRESS),
436                size: Some(FAKE_ALLOCATION_1_SIZE),
437                thread_info_key: Some(FAKE_THREAD_1_KEY),
438                stack_trace_key: Some(FAKE_STACK_TRACE_2_KEY),
439                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
440                ..Default::default()
441            }),
442            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
443                thread_info_key: Some(FAKE_THREAD_1_KEY),
444                koid: Some(FAKE_THREAD_1_KOID),
445                name: Some(FAKE_THREAD_1_NAME.to_string()),
446                ..Default::default()
447            }),
448            fheapdump_client::SnapshotElement::ExecutableRegion(
449                fheapdump_client::ExecutableRegion {
450                    address: Some(FAKE_REGION_2_ADDRESS),
451                    size: Some(FAKE_REGION_2_SIZE),
452                    file_offset: Some(FAKE_REGION_2_FILE_OFFSET),
453                    build_id: Some(fheapdump_client::BuildId {
454                        value: FAKE_REGION_2_BUILD_ID.to_vec(),
455                    }),
456                    ..Default::default()
457                },
458            ),
459            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
460                thread_info_key: Some(FAKE_THREAD_2_KEY),
461                koid: Some(FAKE_THREAD_2_KOID),
462                name: Some(FAKE_THREAD_2_NAME.to_string()),
463                ..Default::default()
464            }),
465            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
466                address: Some(FAKE_ALLOCATION_2_ADDRESS),
467                size: Some(FAKE_ALLOCATION_2_SIZE),
468                thread_info_key: Some(FAKE_THREAD_2_KEY),
469                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
470                timestamp: Some(FAKE_ALLOCATION_2_TIMESTAMP),
471                ..Default::default()
472            }),
473            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
474                stack_trace_key: Some(FAKE_STACK_TRACE_2_KEY),
475                program_addresses: Some(FAKE_STACK_TRACE_2_ADDRESSES.to_vec()),
476                ..Default::default()
477            }),
478        ]);
479        fut.await.unwrap();
480
481        // Send the end of stream marker.
482        let fut = receiver_proxy.batch(&[]);
483        fut.await.unwrap();
484
485        // Receive the snapshot we just transmitted and verify its contents.
486        let mut received_snapshot = receive_worker.await.unwrap();
487        let allocation1 = received_snapshot.allocations.swap_remove(
488            received_snapshot
489                .allocations
490                .iter()
491                .position(|alloc| alloc.address == Some(FAKE_ALLOCATION_1_ADDRESS))
492                .unwrap(),
493        );
494        assert_eq!(allocation1.size, FAKE_ALLOCATION_1_SIZE);
495        assert_eq!(
496            allocation1.thread_info,
497            Some(Rc::new(ThreadInfo {
498                koid: FAKE_THREAD_1_KOID,
499                name: FAKE_THREAD_1_NAME.to_owned()
500            }))
501        );
502        assert_eq!(allocation1.stack_trace.program_addresses, FAKE_STACK_TRACE_2_ADDRESSES);
503        assert_eq!(allocation1.timestamp, Some(FAKE_ALLOCATION_1_TIMESTAMP));
504        assert_eq!(
505            allocation1.contents.as_ref().expect("contents must be set"),
506            &FAKE_ALLOCATION_1_CONTENTS.to_vec()
507        );
508        let allocation2 = received_snapshot.allocations.swap_remove(
509            received_snapshot
510                .allocations
511                .iter()
512                .position(|alloc| alloc.address == Some(FAKE_ALLOCATION_2_ADDRESS))
513                .unwrap(),
514        );
515        assert_eq!(allocation2.size, FAKE_ALLOCATION_2_SIZE);
516        assert_eq!(
517            allocation2.thread_info,
518            Some(Rc::new(ThreadInfo {
519                koid: FAKE_THREAD_2_KOID,
520                name: FAKE_THREAD_2_NAME.to_owned()
521            }))
522        );
523        assert_eq!(allocation2.stack_trace.program_addresses, FAKE_STACK_TRACE_1_ADDRESSES);
524        assert_eq!(allocation2.timestamp, Some(FAKE_ALLOCATION_2_TIMESTAMP));
525        assert_matches!(allocation2.contents, None, "no contents are sent for this allocation");
526        assert!(received_snapshot.allocations.is_empty(), "all the entries have been removed");
527        let region1 = received_snapshot.executable_regions.remove(&FAKE_REGION_1_ADDRESS).unwrap();
528        assert_eq!(region1.name, FAKE_REGION_1_NAME);
529        assert_eq!(region1.size, FAKE_REGION_1_SIZE);
530        assert_eq!(region1.file_offset, FAKE_REGION_1_FILE_OFFSET);
531        assert_eq!(region1.vaddr.unwrap(), FAKE_REGION_1_VADDR);
532        assert_eq!(region1.build_id, FAKE_REGION_1_BUILD_ID);
533        let region2 = received_snapshot.executable_regions.remove(&FAKE_REGION_2_ADDRESS).unwrap();
534        assert_eq!(region2.size, FAKE_REGION_2_SIZE);
535        assert_eq!(region2.file_offset, FAKE_REGION_2_FILE_OFFSET);
536        assert_eq!(region2.build_id, FAKE_REGION_2_BUILD_ID);
537        assert!(received_snapshot.executable_regions.is_empty(), "all entries have been removed");
538    }
539
540    #[fasync::run_singlethreaded(test)]
541    async fn test_two_batches() {
542        let client = create_client();
543        let (receiver_proxy, receiver_stream) =
544            client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
545        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
546
547        // Send a first batch.
548        let fut = receiver_proxy.batch(&[
549            fheapdump_client::SnapshotElement::ExecutableRegion(
550                fheapdump_client::ExecutableRegion {
551                    address: Some(FAKE_REGION_2_ADDRESS),
552                    size: Some(FAKE_REGION_2_SIZE),
553                    file_offset: Some(FAKE_REGION_2_FILE_OFFSET),
554                    build_id: Some(fheapdump_client::BuildId {
555                        value: FAKE_REGION_2_BUILD_ID.to_vec(),
556                    }),
557                    ..Default::default()
558                },
559            ),
560            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
561                address: Some(FAKE_ALLOCATION_1_ADDRESS),
562                size: Some(FAKE_ALLOCATION_1_SIZE),
563                thread_info_key: Some(FAKE_THREAD_1_KEY),
564                stack_trace_key: Some(FAKE_STACK_TRACE_2_KEY),
565                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
566                ..Default::default()
567            }),
568            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
569                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
570                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
571                ..Default::default()
572            }),
573            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
574                thread_info_key: Some(FAKE_THREAD_2_KEY),
575                koid: Some(FAKE_THREAD_2_KOID),
576                name: Some(FAKE_THREAD_2_NAME.to_string()),
577                ..Default::default()
578            }),
579        ]);
580        fut.await.unwrap();
581
582        // Send another batch.
583        let fut = receiver_proxy.batch(&[
584            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
585                thread_info_key: Some(FAKE_THREAD_1_KEY),
586                koid: Some(FAKE_THREAD_1_KOID),
587                name: Some(FAKE_THREAD_1_NAME.to_string()),
588                ..Default::default()
589            }),
590            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
591                address: Some(FAKE_ALLOCATION_2_ADDRESS),
592                size: Some(FAKE_ALLOCATION_2_SIZE),
593                thread_info_key: Some(FAKE_THREAD_2_KEY),
594                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
595                timestamp: Some(FAKE_ALLOCATION_2_TIMESTAMP),
596                ..Default::default()
597            }),
598            fheapdump_client::SnapshotElement::ExecutableRegion(
599                fheapdump_client::ExecutableRegion {
600                    address: Some(FAKE_REGION_1_ADDRESS),
601                    name: Some(FAKE_REGION_1_NAME.to_string()),
602                    size: Some(FAKE_REGION_1_SIZE),
603                    file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
604                    vaddr: Some(FAKE_REGION_1_VADDR),
605                    build_id: Some(fheapdump_client::BuildId {
606                        value: FAKE_REGION_1_BUILD_ID.to_vec(),
607                    }),
608                    ..Default::default()
609                },
610            ),
611            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
612                stack_trace_key: Some(FAKE_STACK_TRACE_2_KEY),
613                program_addresses: Some(FAKE_STACK_TRACE_2_ADDRESSES.to_vec()),
614                ..Default::default()
615            }),
616            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
617                address: Some(FAKE_ALLOCATION_1_ADDRESS),
618                contents: Some(FAKE_ALLOCATION_1_CONTENTS.to_vec()),
619                ..Default::default()
620            }),
621        ]);
622        fut.await.unwrap();
623
624        // Send the end of stream marker.
625        let fut = receiver_proxy.batch(&[]);
626        fut.await.unwrap();
627
628        // Receive the snapshot we just transmitted and verify its contents.
629        let mut received_snapshot = receive_worker.await.unwrap();
630        let allocation1 = received_snapshot.allocations.swap_remove(
631            received_snapshot
632                .allocations
633                .iter()
634                .position(|alloc| alloc.address == Some(FAKE_ALLOCATION_1_ADDRESS))
635                .unwrap(),
636        );
637        assert_eq!(allocation1.size, FAKE_ALLOCATION_1_SIZE);
638        assert_eq!(
639            allocation1.thread_info,
640            Some(Rc::new(ThreadInfo {
641                koid: FAKE_THREAD_1_KOID,
642                name: FAKE_THREAD_1_NAME.to_owned()
643            }))
644        );
645        assert_eq!(allocation1.stack_trace.program_addresses, FAKE_STACK_TRACE_2_ADDRESSES);
646        assert_eq!(allocation1.timestamp, Some(FAKE_ALLOCATION_1_TIMESTAMP));
647        assert_eq!(
648            allocation1.contents.as_ref().expect("contents must be set"),
649            &FAKE_ALLOCATION_1_CONTENTS.to_vec()
650        );
651        let allocation2 = received_snapshot.allocations.swap_remove(
652            received_snapshot
653                .allocations
654                .iter()
655                .position(|alloc| alloc.address == Some(FAKE_ALLOCATION_2_ADDRESS))
656                .unwrap(),
657        );
658        assert_eq!(allocation2.size, FAKE_ALLOCATION_2_SIZE);
659        assert_eq!(
660            allocation2.thread_info,
661            Some(Rc::new(ThreadInfo {
662                koid: FAKE_THREAD_2_KOID,
663                name: FAKE_THREAD_2_NAME.to_owned()
664            }))
665        );
666        assert_eq!(allocation2.stack_trace.program_addresses, FAKE_STACK_TRACE_1_ADDRESSES);
667        assert_eq!(allocation2.timestamp, Some(FAKE_ALLOCATION_2_TIMESTAMP));
668        assert_matches!(allocation2.contents, None, "no contents are sent for this allocation");
669        assert!(received_snapshot.allocations.is_empty(), "all the entries have been removed");
670        let region1 = received_snapshot.executable_regions.remove(&FAKE_REGION_1_ADDRESS).unwrap();
671        assert_eq!(region1.name, FAKE_REGION_1_NAME);
672        assert_eq!(region1.size, FAKE_REGION_1_SIZE);
673        assert_eq!(region1.file_offset, FAKE_REGION_1_FILE_OFFSET);
674        assert_eq!(region1.vaddr.unwrap(), FAKE_REGION_1_VADDR);
675        assert_eq!(region1.build_id, FAKE_REGION_1_BUILD_ID);
676        let region2 = received_snapshot.executable_regions.remove(&FAKE_REGION_2_ADDRESS).unwrap();
677        assert_eq!(region2.size, FAKE_REGION_2_SIZE);
678        assert_eq!(region2.file_offset, FAKE_REGION_2_FILE_OFFSET);
679        assert_eq!(region2.build_id, FAKE_REGION_2_BUILD_ID);
680        assert!(received_snapshot.executable_regions.is_empty(), "all entries have been removed");
681    }
682
683    #[test_case(|allocation| allocation.size = None => matches
684        Err(Error::MissingField { container: "Allocation", field: "size" }) ; "size")]
685    #[test_case(|allocation| allocation.stack_trace_key = None => matches
686        Err(Error::MissingField { container: "Allocation", field: "stack_trace_key" }) ; "stack_trace_key")]
687    #[test_case(|allocation| allocation.address = None => matches
688        Ok(_) ; "address_is_optional")]
689    #[test_case(|allocation| allocation.thread_info_key = None => matches
690        Ok(_) ; "thread_info_is_optional")]
691    #[test_case(|allocation| allocation.timestamp = None => matches
692        Ok(_) ; "timestamp_is_optional")]
693    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
694        Ok(_) ; "success")]
695    #[fasync::run_singlethreaded(test)]
696    async fn test_allocation_required_fields(
697        set_one_field_to_none: fn(&mut fheapdump_client::Allocation),
698    ) -> Result<Snapshot, Error> {
699        let client = create_client();
700        let (receiver_proxy, receiver_stream) =
701            client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
702        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
703
704        // Start with an Allocation with all the required fields set.
705        let mut allocation = fheapdump_client::Allocation {
706            address: Some(FAKE_ALLOCATION_1_ADDRESS),
707            size: Some(FAKE_ALLOCATION_1_SIZE),
708            thread_info_key: Some(FAKE_THREAD_1_KEY),
709            stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
710            timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
711            ..Default::default()
712        };
713
714        // Set one of the fields to None, according to the case being tested.
715        set_one_field_to_none(&mut allocation);
716
717        // Send it to the SnapshotReceiver along with the thread info and stack trace it references.
718        let fut = receiver_proxy.batch(&[
719            fheapdump_client::SnapshotElement::Allocation(allocation),
720            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
721                thread_info_key: Some(FAKE_THREAD_1_KEY),
722                koid: Some(FAKE_THREAD_1_KOID),
723                name: Some(FAKE_THREAD_1_NAME.to_string()),
724                ..Default::default()
725            }),
726            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
727                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
728                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
729                ..Default::default()
730            }),
731        ]);
732        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
733
734        // Send the end of stream marker.
735        let fut = receiver_proxy.batch(&[]);
736        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
737
738        // Return the result.
739        receive_worker.await
740    }
741
742    #[test_case(|thread_info| thread_info.thread_info_key = None => matches
743        Err(Error::MissingField { container: "ThreadInfo", field: "thread_info_key" }) ; "thread_info_key")]
744    #[test_case(|thread_info| thread_info.koid = None => matches
745        Err(Error::MissingField { container: "ThreadInfo", field: "koid" }) ; "koid")]
746    #[test_case(|thread_info| thread_info.name = None => matches
747        Err(Error::MissingField { container: "ThreadInfo", field: "name" }) ; "name")]
748    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
749        Ok(_) ; "success")]
750    #[fasync::run_singlethreaded(test)]
751    async fn test_thread_info_required_fields(
752        set_one_field_to_none: fn(&mut fheapdump_client::ThreadInfo),
753    ) -> Result<Snapshot, Error> {
754        let client = create_client();
755        let (receiver_proxy, receiver_stream) =
756            client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
757        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
758
759        // Start with a ThreadInfo with all the required fields set.
760        let mut thread_info = fheapdump_client::ThreadInfo {
761            thread_info_key: Some(FAKE_THREAD_1_KEY),
762            koid: Some(FAKE_THREAD_1_KOID),
763            name: Some(FAKE_THREAD_1_NAME.to_string()),
764            ..Default::default()
765        };
766
767        // Set one of the fields to None, according to the case being tested.
768        set_one_field_to_none(&mut thread_info);
769
770        // Send it to the SnapshotReceiver.
771        let fut =
772            receiver_proxy.batch(&[fheapdump_client::SnapshotElement::ThreadInfo(thread_info)]);
773        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
774
775        // Send the end of stream marker.
776        let fut = receiver_proxy.batch(&[]);
777        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
778
779        // Return the result.
780        receive_worker.await
781    }
782
783    #[test_case(|stack_trace| stack_trace.stack_trace_key = None => matches
784        Err(Error::MissingField { container: "StackTrace", field: "stack_trace_key" }) ; "stack_trace_key")]
785    #[test_case(|stack_trace| stack_trace.program_addresses = None => matches
786        Err(Error::MissingField { container: "StackTrace", field: "program_addresses" }) ; "program_addresses")]
787    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
788        Ok(_) ; "success")]
789    #[fasync::run_singlethreaded(test)]
790    async fn test_stack_trace_required_fields(
791        set_one_field_to_none: fn(&mut fheapdump_client::StackTrace),
792    ) -> Result<Snapshot, Error> {
793        let client = create_client();
794        let (receiver_proxy, receiver_stream) =
795            client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
796        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
797
798        // Start with a StackTrace with all the required fields set.
799        let mut stack_trace = fheapdump_client::StackTrace {
800            stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
801            program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
802            ..Default::default()
803        };
804
805        // Set one of the fields to None, according to the case being tested.
806        set_one_field_to_none(&mut stack_trace);
807
808        // Send it to the SnapshotReceiver.
809        let fut =
810            receiver_proxy.batch(&[fheapdump_client::SnapshotElement::StackTrace(stack_trace)]);
811        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
812
813        // Send the end of stream marker.
814        let fut = receiver_proxy.batch(&[]);
815        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
816
817        // Return the result.
818        receive_worker.await
819    }
820
821    #[test_case(|region| region.address = None => matches
822        Err(Error::MissingField { container: "ExecutableRegion", field: "address" }) ; "address")]
823    #[test_case(|region| region.size = None => matches
824        Err(Error::MissingField { container: "ExecutableRegion", field: "size" }) ; "size")]
825    #[test_case(|region| region.file_offset = None => matches
826        Err(Error::MissingField { container: "ExecutableRegion", field: "file_offset" }) ; "file_offset")]
827    #[test_case(|region| region.build_id = None => matches
828        Err(Error::MissingField { container: "ExecutableRegion", field: "build_id" }) ; "build_id")]
829    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
830        Ok(_) ; "success")]
831    #[fasync::run_singlethreaded(test)]
832    async fn test_executable_region_required_fields(
833        set_one_field_to_none: fn(&mut fheapdump_client::ExecutableRegion),
834    ) -> Result<Snapshot, Error> {
835        let client = create_client();
836        let (receiver_proxy, receiver_stream) =
837            client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
838        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
839
840        // Start with an ExecutableRegion with all the required fields set.
841        let mut region = fheapdump_client::ExecutableRegion {
842            address: Some(FAKE_REGION_1_ADDRESS),
843            size: Some(FAKE_REGION_1_SIZE),
844            file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
845            build_id: Some(fheapdump_client::BuildId { value: FAKE_REGION_1_BUILD_ID.to_vec() }),
846            ..Default::default()
847        };
848
849        // Set one of the fields to None, according to the case being tested.
850        set_one_field_to_none(&mut region);
851
852        // Send it to the SnapshotReceiver.
853        let fut =
854            receiver_proxy.batch(&[fheapdump_client::SnapshotElement::ExecutableRegion(region)]);
855        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
856
857        // Send the end of stream marker.
858        let fut = receiver_proxy.batch(&[]);
859        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
860
861        // Return the result.
862        receive_worker.await
863    }
864
865    #[test_case(|block_contents| block_contents.address = None => matches
866        Err(Error::MissingField { container: "BlockContents", field: "address" }) ; "address")]
867    #[test_case(|block_contents| block_contents.contents = None => matches
868        Err(Error::MissingField { container: "BlockContents", field: "contents" }) ; "contents")]
869    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
870        Ok(_) ; "success")]
871    #[fasync::run_singlethreaded(test)]
872    async fn test_block_contents_required_fields(
873        set_one_field_to_none: fn(&mut fheapdump_client::BlockContents),
874    ) -> Result<Snapshot, Error> {
875        let client = create_client();
876        let (receiver_proxy, receiver_stream) =
877            client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
878        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
879
880        // Start with a BlockContents with all the required fields set.
881        let mut block_contents = fheapdump_client::BlockContents {
882            address: Some(FAKE_ALLOCATION_1_ADDRESS),
883            contents: Some(FAKE_ALLOCATION_1_CONTENTS.to_vec()),
884            ..Default::default()
885        };
886
887        // Set one of the fields to None, according to the case being tested.
888        set_one_field_to_none(&mut block_contents);
889
890        // Send it to the SnapshotReceiver along with the allocation it references.
891        let fut = receiver_proxy.batch(&[
892            fheapdump_client::SnapshotElement::BlockContents(block_contents),
893            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
894                address: Some(FAKE_ALLOCATION_1_ADDRESS),
895                size: Some(FAKE_ALLOCATION_1_SIZE),
896                thread_info_key: Some(FAKE_THREAD_1_KEY),
897                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
898                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
899                ..Default::default()
900            }),
901            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
902                thread_info_key: Some(FAKE_THREAD_1_KEY),
903                koid: Some(FAKE_THREAD_1_KOID),
904                name: Some(FAKE_THREAD_1_NAME.to_string()),
905                ..Default::default()
906            }),
907            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
908                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
909                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
910                ..Default::default()
911            }),
912        ]);
913        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
914
915        // Send the end of stream marker.
916        let fut = receiver_proxy.batch(&[]);
917        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
918
919        // Return the result.
920        receive_worker.await
921    }
922
923    #[fasync::run_singlethreaded(test)]
924    async fn test_conflicting_allocations() {
925        let client = create_client();
926        let (receiver_proxy, receiver_stream) =
927            client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
928        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
929
930        // Send two allocations with the same address along with the stack trace they reference.
931        let fut = receiver_proxy.batch(&[
932            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
933                address: Some(FAKE_ALLOCATION_1_ADDRESS),
934                size: Some(FAKE_ALLOCATION_1_SIZE),
935                thread_info_key: Some(FAKE_THREAD_1_KEY),
936                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
937                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
938                ..Default::default()
939            }),
940            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
941                address: Some(FAKE_ALLOCATION_1_ADDRESS),
942                size: Some(FAKE_ALLOCATION_1_SIZE),
943                thread_info_key: Some(FAKE_THREAD_1_KEY),
944                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
945                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
946                ..Default::default()
947            }),
948            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
949                thread_info_key: Some(FAKE_THREAD_1_KEY),
950                koid: Some(FAKE_THREAD_1_KOID),
951                name: Some(FAKE_THREAD_1_NAME.to_string()),
952                ..Default::default()
953            }),
954            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
955                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
956                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
957                ..Default::default()
958            }),
959        ]);
960        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
961
962        // Send the end of stream marker.
963        let fut = receiver_proxy.batch(&[]);
964        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
965
966        // Verify expected error.
967        assert_matches!(
968            receive_worker.await,
969            Err(Error::ConflictingElement { element_type: "Allocation" })
970        );
971    }
972
973    #[fasync::run_singlethreaded(test)]
974    async fn test_conflicting_executable_regions() {
975        let client = create_client();
976        let (receiver_proxy, receiver_stream) =
977            client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
978        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
979
980        // Send two executable regions with the same address.
981        let fut = receiver_proxy.batch(&[
982            fheapdump_client::SnapshotElement::ExecutableRegion(
983                fheapdump_client::ExecutableRegion {
984                    address: Some(FAKE_REGION_1_ADDRESS),
985                    size: Some(FAKE_REGION_1_SIZE),
986                    file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
987                    build_id: Some(fheapdump_client::BuildId {
988                        value: FAKE_REGION_1_BUILD_ID.to_vec(),
989                    }),
990                    ..Default::default()
991                },
992            ),
993            fheapdump_client::SnapshotElement::ExecutableRegion(
994                fheapdump_client::ExecutableRegion {
995                    address: Some(FAKE_REGION_1_ADDRESS),
996                    size: Some(FAKE_REGION_1_SIZE),
997                    file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
998                    build_id: Some(fheapdump_client::BuildId {
999                        value: FAKE_REGION_1_BUILD_ID.to_vec(),
1000                    }),
1001                    ..Default::default()
1002                },
1003            ),
1004        ]);
1005        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
1006
1007        // Send the end of stream marker.
1008        let fut = receiver_proxy.batch(&[]);
1009        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
1010
1011        // Verify expected error.
1012        assert_matches!(
1013            receive_worker.await,
1014            Err(Error::ConflictingElement { element_type: "ExecutableRegion" })
1015        );
1016    }
1017
1018    #[fasync::run_singlethreaded(test)]
1019    async fn test_block_contents_wrong_size() {
1020        let client = create_client();
1021        let (receiver_proxy, receiver_stream) =
1022            client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1023        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
1024
1025        // Send an allocation whose BlockContents has the wrong size.
1026        let contents_with_wrong_size = vec![0; FAKE_ALLOCATION_1_SIZE as usize + 1];
1027        let fut = receiver_proxy.batch(&[
1028            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1029                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1030                size: Some(FAKE_ALLOCATION_1_SIZE),
1031                thread_info_key: Some(FAKE_THREAD_1_KEY),
1032                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1033                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1034                ..Default::default()
1035            }),
1036            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
1037                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1038                contents: Some(contents_with_wrong_size),
1039                ..Default::default()
1040            }),
1041            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1042                thread_info_key: Some(FAKE_THREAD_1_KEY),
1043                koid: Some(FAKE_THREAD_1_KOID),
1044                name: Some(FAKE_THREAD_1_NAME.to_string()),
1045                ..Default::default()
1046            }),
1047            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1048                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1049                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1050                ..Default::default()
1051            }),
1052        ]);
1053        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
1054
1055        // Send the end of stream marker.
1056        let fut = receiver_proxy.batch(&[]);
1057        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
1058
1059        // Verify expected error.
1060        assert_matches!(
1061            receive_worker.await,
1062            Err(Error::ConflictingElement { element_type: "BlockContents" })
1063        );
1064    }
1065
1066    #[fasync::run_singlethreaded(test)]
1067    async fn test_empty_stack_trace() {
1068        let client = create_client();
1069        let (receiver_proxy, receiver_stream) =
1070            client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1071        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
1072
1073        // Send an allocation that references an empty stack trace.
1074        let fut = receiver_proxy.batch(&[
1075            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1076                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1077                size: Some(FAKE_ALLOCATION_1_SIZE),
1078                thread_info_key: Some(FAKE_THREAD_1_KEY),
1079                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1080                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1081                ..Default::default()
1082            }),
1083            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1084                thread_info_key: Some(FAKE_THREAD_1_KEY),
1085                koid: Some(FAKE_THREAD_1_KOID),
1086                name: Some(FAKE_THREAD_1_NAME.to_string()),
1087                ..Default::default()
1088            }),
1089            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1090                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1091                program_addresses: Some(vec![]),
1092                ..Default::default()
1093            }),
1094        ]);
1095        fut.await.unwrap();
1096
1097        // Send the end of stream marker.
1098        let fut = receiver_proxy.batch(&[]);
1099        fut.await.unwrap();
1100
1101        // Verify that the stack trace has been reconstructed correctly.
1102        let received_snapshot = receive_worker.await.unwrap();
1103        let allocation1 = received_snapshot
1104            .allocations
1105            .iter()
1106            .find(|a| a.address == Some(FAKE_ALLOCATION_1_ADDRESS))
1107            .unwrap();
1108        assert_eq!(allocation1.stack_trace.program_addresses, []);
1109    }
1110
1111    #[fasync::run_singlethreaded(test)]
1112    async fn test_chunked_stack_trace() {
1113        let client = create_client();
1114        let (receiver_proxy, receiver_stream) =
1115            client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1116        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
1117
1118        // Send an allocation and the first chunk of its stack trace.
1119        let fut = receiver_proxy.batch(&[
1120            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1121                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1122                size: Some(FAKE_ALLOCATION_1_SIZE),
1123                thread_info_key: Some(FAKE_THREAD_1_KEY),
1124                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1125                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1126                ..Default::default()
1127            }),
1128            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1129                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1130                program_addresses: Some(vec![1111, 2222]),
1131                ..Default::default()
1132            }),
1133            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1134                thread_info_key: Some(FAKE_THREAD_1_KEY),
1135                koid: Some(FAKE_THREAD_1_KOID),
1136                name: Some(FAKE_THREAD_1_NAME.to_string()),
1137                ..Default::default()
1138            }),
1139        ]);
1140        fut.await.unwrap();
1141
1142        // Send the second chunk.
1143        let fut = receiver_proxy.batch(&[fheapdump_client::SnapshotElement::StackTrace(
1144            fheapdump_client::StackTrace {
1145                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1146                program_addresses: Some(vec![3333]),
1147                ..Default::default()
1148            },
1149        )]);
1150        fut.await.unwrap();
1151
1152        // Send the end of stream marker.
1153        let fut = receiver_proxy.batch(&[]);
1154        fut.await.unwrap();
1155
1156        // Verify that the stack trace has been reconstructed correctly.
1157        let received_snapshot = receive_worker.await.unwrap();
1158        let allocation1 = received_snapshot
1159            .allocations
1160            .iter()
1161            .find(|alloc| alloc.address == Some(FAKE_ALLOCATION_1_ADDRESS))
1162            .unwrap();
1163        assert_eq!(allocation1.stack_trace.program_addresses, [1111, 2222, 3333]);
1164    }
1165
1166    #[fasync::run_singlethreaded(test)]
1167    async fn test_empty_block_contents() {
1168        let client = create_client();
1169        let (receiver_proxy, receiver_stream) =
1170            client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1171        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
1172
1173        // Send a zero-sized allocation and its empty contents.
1174        let fut = receiver_proxy.batch(&[
1175            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1176                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1177                size: Some(0),
1178                thread_info_key: Some(FAKE_THREAD_1_KEY),
1179                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1180                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1181                ..Default::default()
1182            }),
1183            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1184                thread_info_key: Some(FAKE_THREAD_1_KEY),
1185                koid: Some(FAKE_THREAD_1_KOID),
1186                name: Some(FAKE_THREAD_1_NAME.to_string()),
1187                ..Default::default()
1188            }),
1189            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1190                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1191                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1192                ..Default::default()
1193            }),
1194            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
1195                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1196                contents: Some(vec![]),
1197                ..Default::default()
1198            }),
1199        ]);
1200        fut.await.unwrap();
1201
1202        // Send the end of stream marker.
1203        let fut = receiver_proxy.batch(&[]);
1204        fut.await.unwrap();
1205
1206        // Verify that the allocation has been reconstructed correctly.
1207        let received_snapshot = receive_worker.await.unwrap();
1208        let allocation1 = received_snapshot
1209            .allocations
1210            .iter()
1211            .find(|alloc| alloc.address == Some(FAKE_ALLOCATION_1_ADDRESS))
1212            .unwrap();
1213        assert_eq!(allocation1.contents.as_ref().expect("contents must be set"), &vec![]);
1214    }
1215
1216    #[fasync::run_singlethreaded(test)]
1217    async fn test_chunked_block_contents() {
1218        let client = create_client();
1219        let (receiver_proxy, receiver_stream) =
1220            client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1221        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
1222
1223        // Split the contents in two halves.
1224        let (content_first_chunk, contents_second_chunk) =
1225            FAKE_ALLOCATION_1_CONTENTS.split_at(FAKE_ALLOCATION_1_CONTENTS.len() / 2);
1226
1227        // Send an allocation and the first chunk of its contents.
1228        let fut = receiver_proxy.batch(&[
1229            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1230                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1231                size: Some(FAKE_ALLOCATION_1_SIZE),
1232                thread_info_key: Some(FAKE_THREAD_1_KEY),
1233                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1234                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1235                ..Default::default()
1236            }),
1237            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1238                thread_info_key: Some(FAKE_THREAD_1_KEY),
1239                koid: Some(FAKE_THREAD_1_KOID),
1240                name: Some(FAKE_THREAD_1_NAME.to_string()),
1241                ..Default::default()
1242            }),
1243            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1244                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1245                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1246                ..Default::default()
1247            }),
1248            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
1249                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1250                contents: Some(content_first_chunk.to_vec()),
1251                ..Default::default()
1252            }),
1253        ]);
1254        fut.await.unwrap();
1255
1256        // Send the second chunk.
1257        let fut = receiver_proxy.batch(&[fheapdump_client::SnapshotElement::BlockContents(
1258            fheapdump_client::BlockContents {
1259                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1260                contents: Some(contents_second_chunk.to_vec()),
1261                ..Default::default()
1262            },
1263        )]);
1264        fut.await.unwrap();
1265
1266        // Send the end of stream marker.
1267        let fut = receiver_proxy.batch(&[]);
1268        fut.await.unwrap();
1269
1270        // Verify that the allocation's block contents have been reconstructed correctly.
1271        let received_snapshot = receive_worker.await.unwrap();
1272        let allocation1 = received_snapshot
1273            .allocations
1274            .iter()
1275            .find(|a| a.address == Some(FAKE_ALLOCATION_1_ADDRESS))
1276            .unwrap();
1277        assert_eq!(allocation1.contents, Some(FAKE_ALLOCATION_1_CONTENTS.to_vec()));
1278    }
1279
1280    #[fasync::run_singlethreaded(test)]
1281    async fn test_missing_end_of_stream() {
1282        let client = create_client();
1283        let (receiver_proxy, receiver_stream) =
1284            client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1285        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
1286
1287        // Send an allocation and its stack trace.
1288        let fut = receiver_proxy.batch(&[
1289            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1290                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1291                size: Some(FAKE_ALLOCATION_1_SIZE),
1292                thread_info_key: Some(FAKE_THREAD_1_KEY),
1293                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1294                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1295                ..Default::default()
1296            }),
1297            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1298                thread_info_key: Some(FAKE_THREAD_1_KEY),
1299                koid: Some(FAKE_THREAD_1_KOID),
1300                name: Some(FAKE_THREAD_1_NAME.to_string()),
1301                ..Default::default()
1302            }),
1303            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1304                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1305                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1306                ..Default::default()
1307            }),
1308        ]);
1309        fut.await.unwrap();
1310
1311        // Close the channel without sending an end of stream marker.
1312        std::mem::drop(receiver_proxy);
1313
1314        // Expect an UnexpectedEndOfStream error.
1315        assert_matches!(receive_worker.await, Err(Error::UnexpectedEndOfStream));
1316    }
1317
1318    #[fasync::run_singlethreaded(test)]
1319    async fn test_multi_contents() {
1320        let client = create_client();
1321        let (receiver_proxy, receiver_stream) =
1322            client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1323        let receive_worker = fasync::Task::local(Snapshot::receive_multi_from(receiver_stream));
1324
1325        // Send two snapshots with different KOIDs.
1326        for koid in [1111, 2222] {
1327            receiver_proxy
1328                .batch(&[fheapdump_client::SnapshotElement::SnapshotHeader(
1329                    fheapdump_client::SnapshotHeader {
1330                        process_name: Some(format!("test-process-{koid}")),
1331                        process_koid: Some(koid),
1332                        ..Default::default()
1333                    },
1334                )])
1335                .await
1336                .unwrap();
1337
1338            receiver_proxy
1339                .batch(&[
1340                    fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1341                        address: Some(FAKE_ALLOCATION_1_ADDRESS),
1342                        size: Some(FAKE_ALLOCATION_1_SIZE),
1343                        thread_info_key: Some(FAKE_THREAD_1_KEY),
1344                        stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1345                        timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1346                        ..Default::default()
1347                    }),
1348                    fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1349                        thread_info_key: Some(FAKE_THREAD_1_KEY),
1350                        koid: Some(FAKE_THREAD_1_KOID),
1351                        name: Some(FAKE_THREAD_1_NAME.to_string()),
1352                        ..Default::default()
1353                    }),
1354                    fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1355                        stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1356                        program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1357                        ..Default::default()
1358                    }),
1359                ])
1360                .await
1361                .unwrap();
1362
1363            receiver_proxy.batch(&[]).await.unwrap(); // end of snapshot
1364        }
1365
1366        // Send end of stream marker.
1367        receiver_proxy.batch(&[]).await.unwrap();
1368
1369        // Validate the received snapshots.
1370        let received_snapshots = receive_worker.await.unwrap();
1371        assert_eq!(received_snapshots.len(), 2);
1372        assert_eq!(received_snapshots[0].process_name, "test-process-1111");
1373        assert_eq!(received_snapshots[0].process_koid, 1111);
1374        assert_eq!(received_snapshots[1].process_name, "test-process-2222");
1375        assert_eq!(received_snapshots[1].process_koid, 2222);
1376    }
1377
1378    #[fasync::run_singlethreaded(test)]
1379    async fn test_multi_missing_end_of_stream() {
1380        let client = create_client();
1381        let (receiver_proxy, receiver_stream) =
1382            client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1383        let receive_worker = fasync::Task::local(Snapshot::receive_multi_from(receiver_stream));
1384
1385        // Send two snapshots with different KOIDs.
1386        for koid in [1111, 2222] {
1387            receiver_proxy
1388                .batch(&[fheapdump_client::SnapshotElement::SnapshotHeader(
1389                    fheapdump_client::SnapshotHeader {
1390                        process_name: Some("test-process-name".to_string()),
1391                        process_koid: Some(koid),
1392                        ..Default::default()
1393                    },
1394                )])
1395                .await
1396                .unwrap();
1397
1398            receiver_proxy
1399                .batch(&[
1400                    fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1401                        address: Some(FAKE_ALLOCATION_1_ADDRESS),
1402                        size: Some(FAKE_ALLOCATION_1_SIZE),
1403                        thread_info_key: Some(FAKE_THREAD_1_KEY),
1404                        stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1405                        timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1406                        ..Default::default()
1407                    }),
1408                    fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1409                        thread_info_key: Some(FAKE_THREAD_1_KEY),
1410                        koid: Some(FAKE_THREAD_1_KOID),
1411                        name: Some(FAKE_THREAD_1_NAME.to_string()),
1412                        ..Default::default()
1413                    }),
1414                    fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1415                        stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1416                        program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1417                        ..Default::default()
1418                    }),
1419                ])
1420                .await
1421                .unwrap();
1422
1423            receiver_proxy.batch(&[]).await.unwrap(); // end of snapshot
1424        }
1425
1426        // Close the channel without sending an end of stream marker.
1427        std::mem::drop(receiver_proxy);
1428
1429        // Expect an UnexpectedEndOfStream error.
1430        assert_matches!(receive_worker.await, Err(Error::UnexpectedEndOfStream));
1431    }
1432}