heapdump_snapshot/
snapshot.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_fuchsia_memory_heapdump_client as fheapdump_client;
6use futures::StreamExt;
7use std::collections::{HashMap, HashSet};
8use std::rc::Rc;
9
10use crate::Error;
11
12/// Contains a snapshot along with metadata from its header.
13#[derive(Debug)]
14pub struct SnapshotWithHeader {
15    pub process_name: String,
16    pub process_koid: u64,
17    pub snapshot: Snapshot,
18}
19
20/// Contains all the data received over a `SnapshotReceiver` channel.
21#[derive(Debug)]
22pub struct Snapshot {
23    /// All the live allocations in the analyzed process, indexed by memory address.
24    pub allocations: Vec<Allocation>,
25
26    /// All the executable memory regions in the analyzed process, indexed by start address.
27    pub executable_regions: HashMap<u64, ExecutableRegion>,
28}
29
30/// Information about one or more allocated memory blocks.
31#[derive(Debug)]
32pub struct Allocation {
33    pub address: Option<u64>,
34
35    /// Number of allocations that have been aggregated into this `Allocation` instance.
36    pub count: u64,
37
38    /// Total size, in bytes.
39    pub size: u64,
40
41    /// Allocating thread.
42    pub thread_info: Option<Rc<ThreadInfo>>,
43
44    /// Stack trace of the allocation site.
45    pub stack_trace: Rc<StackTrace>,
46
47    /// Allocation timestamp, in nanoseconds.
48    pub timestamp: Option<fidl::MonotonicInstant>,
49
50    /// Memory dump of this block's contents.
51    pub contents: Option<Vec<u8>>,
52}
53
54/// A stack trace.
55#[derive(Debug)]
56pub struct StackTrace {
57    /// Code addresses at each call frame. The first entry corresponds to the leaf call.
58    pub program_addresses: Vec<u64>,
59}
60
61/// A memory region containing code loaded from an ELF file.
62#[derive(Debug)]
63pub struct ExecutableRegion {
64    /// Region name for human consumption (usually either the ELF soname or the VMO name), if known.
65    pub name: String,
66
67    /// Region size, in bytes.
68    pub size: u64,
69
70    /// The corresponding offset in the ELF file.
71    pub file_offset: u64,
72
73    /// The corresponding relative address in the ELF file.
74    ///
75    /// Note: this field is not populated if the snapshot was generated by a build from before the
76    /// corresponding FIDL field was introduced.
77    pub vaddr: Option<u64>,
78
79    /// The Build ID of the ELF file.
80    pub build_id: Vec<u8>,
81}
82
83/// Information identifying a specific thread.
84#[derive(Debug, PartialEq)]
85pub struct ThreadInfo {
86    /// The thread's koid.
87    pub koid: zx_types::zx_koid_t,
88
89    /// The thread's name.
90    pub name: String,
91}
92
93/// Gets the value of a field in a FIDL table as a `Result<T, Error>`.
94///
95/// An `Err(Error::MissingField { .. })` is returned if the field's value is `None`.
96///
97/// Usage: `read_field!(container_expression => ContainerType, field_name)`
98///
99/// # Example
100///
101/// ```
102/// struct MyFidlTable { field: Option<u32>, .. }
103/// let table = MyFidlTable { field: Some(44), .. };
104///
105/// let val = read_field!(table => MyFidlTable, field)?;
106/// ```
107macro_rules! read_field {
108    ($e:expr => $c:ident, $f:ident) => {
109        $e.$f.ok_or(Error::MissingField {
110            container: std::stringify!($c),
111            field: std::stringify!($f),
112        })
113    };
114}
115
116impl Snapshot {
117    /// Receives a snapshot over a `SnapshotReceiver` channel and reassembles it.
118    pub async fn receive_single_from(
119        mut stream: fheapdump_client::SnapshotReceiverRequestStream,
120    ) -> Result<Snapshot, Error> {
121        Snapshot::receive_inner(&mut stream).await
122    }
123
124    /// Receives multiple header-prefixed snapshots over a `SnapshotReceiver` channel and
125    /// reassemble them.
126    #[cfg(fuchsia_api_level_at_least = "HEAD")]
127    pub async fn receive_multi_from(
128        mut stream: fheapdump_client::SnapshotReceiverRequestStream,
129    ) -> Result<Vec<SnapshotWithHeader>, Error> {
130        let mut snapshots = vec![];
131        loop {
132            // Wait for a batch of elements containing either just a header element or an empty
133            // batch (to signal the end of the stream).
134            match stream.next().await.transpose()? {
135                Some(fheapdump_client::SnapshotReceiverRequest::Batch { batch, responder }) => {
136                    match &batch[..] {
137                        [fheapdump_client::SnapshotElement::SnapshotHeader(header)] => {
138                            responder.send()?;
139
140                            // Receive the actual snapshot.
141                            let snapshot = Snapshot::receive_inner(&mut stream).await?;
142
143                            let header = header.clone();
144                            snapshots.push(SnapshotWithHeader {
145                                process_name: read_field!(header => SnapshotHeader, process_name)?,
146                                process_koid: read_field!(header => SnapshotHeader, process_koid)?,
147                                snapshot,
148                            });
149                        }
150                        [] => {
151                            responder.send()?;
152                            return Ok(snapshots);
153                        }
154                        _ => return Err(Error::HeaderExpected),
155                    }
156                }
157                Some(fheapdump_client::SnapshotReceiverRequest::ReportError {
158                    error,
159                    responder,
160                }) => {
161                    let _ = responder.send(); // Ignore the result of the acknowledgment.
162                    return Err(Error::CollectorError(error));
163                }
164                None => return Err(Error::UnexpectedEndOfStream),
165            };
166        }
167    }
168
169    async fn receive_inner(
170        stream: &mut fheapdump_client::SnapshotReceiverRequestStream,
171    ) -> Result<Snapshot, Error> {
172        struct AllocationValue {
173            address: Option<u64>,
174            count: u64,
175            size: u64,
176            thread_info_key: Option<u64>,
177            stack_trace_key: u64,
178            timestamp: Option<fidl::MonotonicInstant>,
179        }
180        let mut allocation_addresses: HashSet<u64> = HashSet::new();
181        let mut allocations: Vec<AllocationValue> = vec![];
182        let mut thread_infos: HashMap<u64, Rc<ThreadInfo>> = HashMap::new();
183        let mut stack_traces: HashMap<u64, Vec<u64>> = HashMap::new();
184        let mut executable_regions: HashMap<u64, ExecutableRegion> = HashMap::new();
185        let mut contents: HashMap<u64, Vec<u8>> = HashMap::new();
186
187        loop {
188            // Wait for the next batch of elements.
189            let batch = match stream.next().await.transpose()? {
190                Some(fheapdump_client::SnapshotReceiverRequest::Batch { batch, responder }) => {
191                    // Send acknowledgment as quickly as possible, then keep processing the received batch.
192                    responder.send()?;
193                    batch
194                }
195                Some(fheapdump_client::SnapshotReceiverRequest::ReportError {
196                    error,
197                    responder,
198                }) => {
199                    let _ = responder.send(); // Ignore the result of the acknowledgment.
200                    return Err(Error::CollectorError(error));
201                }
202                None => return Err(Error::UnexpectedEndOfStream),
203            };
204
205            // Process data. An empty batch signals the end of the stream.
206            if !batch.is_empty() {
207                for element in batch {
208                    match element {
209                        fheapdump_client::SnapshotElement::Allocation(allocation) => {
210                            if let Some(address) = allocation.address {
211                                if !allocation_addresses.insert(address) {
212                                    return Err(Error::ConflictingElement {
213                                        element_type: "Allocation",
214                                    });
215                                }
216                            }
217
218                            #[cfg(not(fuchsia_api_level_at_least = "29"))]
219                            let count = 1;
220                            #[cfg(fuchsia_api_level_at_least = "29")]
221                            let count = allocation.count.unwrap_or(1);
222
223                            let size = read_field!(allocation => Allocation, size)?;
224                            let stack_trace_key =
225                                read_field!(allocation => Allocation, stack_trace_key)?;
226                            allocations.push(AllocationValue {
227                                address: allocation.address,
228                                count,
229                                size,
230                                thread_info_key: allocation.thread_info_key,
231                                stack_trace_key,
232                                timestamp: allocation.timestamp,
233                            });
234                        }
235                        fheapdump_client::SnapshotElement::StackTrace(stack_trace) => {
236                            let stack_trace_key =
237                                read_field!(stack_trace => StackTrace, stack_trace_key)?;
238                            let mut program_addresses =
239                                read_field!(stack_trace => StackTrace, program_addresses)?;
240                            stack_traces
241                                .entry(stack_trace_key)
242                                .or_default()
243                                .append(&mut program_addresses);
244                        }
245                        fheapdump_client::SnapshotElement::ThreadInfo(thread_info) => {
246                            let thread_info_key =
247                                read_field!(thread_info => ThreadInfo, thread_info_key)?;
248                            let koid = read_field!(thread_info => ThreadInfo, koid)?;
249                            let name = read_field!(thread_info => ThreadInfo, name)?;
250                            if thread_infos
251                                .insert(thread_info_key, Rc::new(ThreadInfo { koid, name }))
252                                .is_some()
253                            {
254                                return Err(Error::ConflictingElement {
255                                    element_type: "ThreadInfo",
256                                });
257                            }
258                        }
259                        fheapdump_client::SnapshotElement::ExecutableRegion(region) => {
260                            let address = read_field!(region => ExecutableRegion, address)?;
261                            #[cfg(fuchsia_api_level_at_least = "27")]
262                            let name = region.name.unwrap_or_else(|| String::new());
263                            #[cfg(not(fuchsia_api_level_at_least = "27"))]
264                            let name = String::new();
265                            let size = read_field!(region => ExecutableRegion, size)?;
266                            let file_offset = read_field!(region => ExecutableRegion, file_offset)?;
267                            #[cfg(fuchsia_api_level_at_least = "27")]
268                            let vaddr = region.vaddr;
269                            #[cfg(not(fuchsia_api_level_at_least = "27"))]
270                            let vaddr = None;
271                            let build_id = read_field!(region => ExecutableRegion, build_id)?.value;
272                            let region =
273                                ExecutableRegion { name, size, file_offset, vaddr, build_id };
274                            if executable_regions.insert(address, region).is_some() {
275                                return Err(Error::ConflictingElement {
276                                    element_type: "ExecutableRegion",
277                                });
278                            }
279                        }
280                        fheapdump_client::SnapshotElement::BlockContents(block_contents) => {
281                            let address = read_field!(block_contents => BlockContents, address)?;
282                            let mut chunk = read_field!(block_contents => BlockContents, contents)?;
283                            contents.entry(address).or_default().append(&mut chunk);
284                        }
285                        _ => return Err(Error::UnexpectedElementType),
286                    }
287                }
288            } else {
289                // We are at the end of the stream. Convert to the final types and resolve
290                // cross-references.
291                let final_stack_traces: HashMap<u64, Rc<StackTrace>> = stack_traces
292                    .into_iter()
293                    .map(|(key, program_addresses)| {
294                        (key, Rc::new(StackTrace { program_addresses }))
295                    })
296                    .collect();
297                let mut final_allocations = vec![];
298                for AllocationValue {
299                    address,
300                    count,
301                    size,
302                    thread_info_key,
303                    stack_trace_key,
304                    timestamp,
305                } in allocations
306                {
307                    let thread_info = match thread_info_key {
308                        Some(key) => Some(
309                            thread_infos
310                                .get(&key)
311                                .ok_or(Error::InvalidCrossReference { element_type: "ThreadInfo" })?
312                                .clone(),
313                        ),
314                        None => None,
315                    };
316                    let stack_trace = final_stack_traces
317                        .get(&stack_trace_key)
318                        .ok_or(Error::InvalidCrossReference { element_type: "StackTrace" })?
319                        .clone();
320                    let contents = address.and_then(|address| contents.remove(&address));
321                    if let Some(data) = &contents {
322                        if data.len() as u64 != size {
323                            return Err(Error::ConflictingElement {
324                                element_type: "BlockContents",
325                            });
326                        }
327                    }
328                    final_allocations.push(Allocation {
329                        address,
330                        count,
331                        size,
332                        thread_info,
333                        stack_trace,
334                        timestamp,
335                        contents,
336                    });
337                }
338
339                return Ok(Snapshot { allocations: final_allocations, executable_regions });
340            }
341        }
342    }
343}
344
345#[cfg(test)]
346mod tests {
347    use super::*;
348    use assert_matches::assert_matches;
349    use fidl::endpoints::create_proxy_and_stream;
350    use fuchsia_async as fasync;
351    use test_case::test_case;
352
353    // Constants used by some of the tests below:
354    const FAKE_ALLOCATION_1_ADDRESS: u64 = 1234;
355    const FAKE_ALLOCATION_1_SIZE: u64 = 8;
356    const FAKE_ALLOCATION_1_TIMESTAMP: fidl::MonotonicInstant =
357        fidl::MonotonicInstant::from_nanos(888888888);
358    const FAKE_ALLOCATION_1_CONTENTS: [u8; FAKE_ALLOCATION_1_SIZE as usize] = *b"12345678";
359    const FAKE_ALLOCATION_2_ADDRESS: u64 = 5678;
360    const FAKE_ALLOCATION_2_SIZE: u64 = 4;
361    const FAKE_ALLOCATION_2_TIMESTAMP: fidl::MonotonicInstant =
362        fidl::MonotonicInstant::from_nanos(-777777777); // test negative value too
363    const FAKE_THREAD_1_KOID: u64 = 1212;
364    const FAKE_THREAD_1_NAME: &str = "fake-thread-1-name";
365    const FAKE_THREAD_1_KEY: u64 = 4567;
366    const FAKE_THREAD_2_KOID: u64 = 1213;
367    const FAKE_THREAD_2_NAME: &str = "fake-thread-2-name";
368    const FAKE_THREAD_2_KEY: u64 = 7654;
369    const FAKE_STACK_TRACE_1_ADDRESSES: [u64; 6] = [11111, 22222, 33333, 22222, 44444, 55555];
370    const FAKE_STACK_TRACE_1_KEY: u64 = 9876;
371    const FAKE_STACK_TRACE_2_ADDRESSES: [u64; 4] = [11111, 22222, 11111, 66666];
372    const FAKE_STACK_TRACE_2_KEY: u64 = 6789;
373    const FAKE_REGION_1_ADDRESS: u64 = 0x10000000;
374    const FAKE_REGION_1_NAME: &str = "region-1";
375    const FAKE_REGION_1_SIZE: u64 = 0x80000;
376    const FAKE_REGION_1_FILE_OFFSET: u64 = 0x1000;
377    const FAKE_REGION_1_VADDR: u64 = 0x3000;
378    const FAKE_REGION_1_BUILD_ID: &[u8] = &[0xaa; 20];
379    const FAKE_REGION_2_ADDRESS: u64 = 0x7654300000;
380    const FAKE_REGION_2_SIZE: u64 = 0x200000;
381    const FAKE_REGION_2_FILE_OFFSET: u64 = 0x2000;
382    const FAKE_REGION_2_BUILD_ID: &[u8] = &[0x55; 32];
383
384    #[fasync::run_singlethreaded(test)]
385    async fn test_empty() {
386        let (receiver_proxy, receiver_stream) =
387            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
388        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
389
390        // Send the end of stream marker.
391        let fut = receiver_proxy.batch(&[]);
392        fut.await.unwrap();
393
394        // Receive the snapshot we just transmitted and verify that it is empty.
395        let received_snapshot = receive_worker.await.unwrap();
396        assert!(received_snapshot.allocations.is_empty());
397        assert!(received_snapshot.executable_regions.is_empty());
398    }
399
400    #[fasync::run_singlethreaded(test)]
401    async fn test_one_batch() {
402        let (receiver_proxy, receiver_stream) =
403            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
404        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
405
406        // Send a batch containing two allocations - whose threads, stack traces and contents can be
407        // listed before or after the allocation(s) that reference them - and two executable
408        // regions.
409        let fut = receiver_proxy.batch(&[
410            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
411                address: Some(FAKE_ALLOCATION_1_ADDRESS),
412                contents: Some(FAKE_ALLOCATION_1_CONTENTS.to_vec()),
413                ..Default::default()
414            }),
415            fheapdump_client::SnapshotElement::ExecutableRegion(
416                fheapdump_client::ExecutableRegion {
417                    address: Some(FAKE_REGION_1_ADDRESS),
418                    name: Some(FAKE_REGION_1_NAME.to_string()),
419                    size: Some(FAKE_REGION_1_SIZE),
420                    file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
421                    vaddr: Some(FAKE_REGION_1_VADDR),
422                    build_id: Some(fheapdump_client::BuildId {
423                        value: FAKE_REGION_1_BUILD_ID.to_vec(),
424                    }),
425                    ..Default::default()
426                },
427            ),
428            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
429                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
430                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
431                ..Default::default()
432            }),
433            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
434                address: Some(FAKE_ALLOCATION_1_ADDRESS),
435                size: Some(FAKE_ALLOCATION_1_SIZE),
436                thread_info_key: Some(FAKE_THREAD_1_KEY),
437                stack_trace_key: Some(FAKE_STACK_TRACE_2_KEY),
438                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
439                ..Default::default()
440            }),
441            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
442                thread_info_key: Some(FAKE_THREAD_1_KEY),
443                koid: Some(FAKE_THREAD_1_KOID),
444                name: Some(FAKE_THREAD_1_NAME.to_string()),
445                ..Default::default()
446            }),
447            fheapdump_client::SnapshotElement::ExecutableRegion(
448                fheapdump_client::ExecutableRegion {
449                    address: Some(FAKE_REGION_2_ADDRESS),
450                    size: Some(FAKE_REGION_2_SIZE),
451                    file_offset: Some(FAKE_REGION_2_FILE_OFFSET),
452                    build_id: Some(fheapdump_client::BuildId {
453                        value: FAKE_REGION_2_BUILD_ID.to_vec(),
454                    }),
455                    ..Default::default()
456                },
457            ),
458            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
459                thread_info_key: Some(FAKE_THREAD_2_KEY),
460                koid: Some(FAKE_THREAD_2_KOID),
461                name: Some(FAKE_THREAD_2_NAME.to_string()),
462                ..Default::default()
463            }),
464            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
465                address: Some(FAKE_ALLOCATION_2_ADDRESS),
466                size: Some(FAKE_ALLOCATION_2_SIZE),
467                thread_info_key: Some(FAKE_THREAD_2_KEY),
468                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
469                timestamp: Some(FAKE_ALLOCATION_2_TIMESTAMP),
470                ..Default::default()
471            }),
472            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
473                stack_trace_key: Some(FAKE_STACK_TRACE_2_KEY),
474                program_addresses: Some(FAKE_STACK_TRACE_2_ADDRESSES.to_vec()),
475                ..Default::default()
476            }),
477        ]);
478        fut.await.unwrap();
479
480        // Send the end of stream marker.
481        let fut = receiver_proxy.batch(&[]);
482        fut.await.unwrap();
483
484        // Receive the snapshot we just transmitted and verify its contents.
485        let mut received_snapshot = receive_worker.await.unwrap();
486        let allocation1 = received_snapshot.allocations.swap_remove(
487            received_snapshot
488                .allocations
489                .iter()
490                .position(|alloc| alloc.address == Some(FAKE_ALLOCATION_1_ADDRESS))
491                .unwrap(),
492        );
493        assert_eq!(allocation1.size, FAKE_ALLOCATION_1_SIZE);
494        assert_eq!(
495            allocation1.thread_info,
496            Some(Rc::new(ThreadInfo {
497                koid: FAKE_THREAD_1_KOID,
498                name: FAKE_THREAD_1_NAME.to_owned()
499            }))
500        );
501        assert_eq!(allocation1.stack_trace.program_addresses, FAKE_STACK_TRACE_2_ADDRESSES);
502        assert_eq!(allocation1.timestamp, Some(FAKE_ALLOCATION_1_TIMESTAMP));
503        assert_eq!(
504            allocation1.contents.as_ref().expect("contents must be set"),
505            &FAKE_ALLOCATION_1_CONTENTS.to_vec()
506        );
507        let allocation2 = received_snapshot.allocations.swap_remove(
508            received_snapshot
509                .allocations
510                .iter()
511                .position(|alloc| alloc.address == Some(FAKE_ALLOCATION_2_ADDRESS))
512                .unwrap(),
513        );
514        assert_eq!(allocation2.size, FAKE_ALLOCATION_2_SIZE);
515        assert_eq!(
516            allocation2.thread_info,
517            Some(Rc::new(ThreadInfo {
518                koid: FAKE_THREAD_2_KOID,
519                name: FAKE_THREAD_2_NAME.to_owned()
520            }))
521        );
522        assert_eq!(allocation2.stack_trace.program_addresses, FAKE_STACK_TRACE_1_ADDRESSES);
523        assert_eq!(allocation2.timestamp, Some(FAKE_ALLOCATION_2_TIMESTAMP));
524        assert_matches!(allocation2.contents, None, "no contents are sent for this allocation");
525        assert!(received_snapshot.allocations.is_empty(), "all the entries have been removed");
526        let region1 = received_snapshot.executable_regions.remove(&FAKE_REGION_1_ADDRESS).unwrap();
527        assert_eq!(region1.name, FAKE_REGION_1_NAME);
528        assert_eq!(region1.size, FAKE_REGION_1_SIZE);
529        assert_eq!(region1.file_offset, FAKE_REGION_1_FILE_OFFSET);
530        assert_eq!(region1.vaddr.unwrap(), FAKE_REGION_1_VADDR);
531        assert_eq!(region1.build_id, FAKE_REGION_1_BUILD_ID);
532        let region2 = received_snapshot.executable_regions.remove(&FAKE_REGION_2_ADDRESS).unwrap();
533        assert_eq!(region2.size, FAKE_REGION_2_SIZE);
534        assert_eq!(region2.file_offset, FAKE_REGION_2_FILE_OFFSET);
535        assert_eq!(region2.build_id, FAKE_REGION_2_BUILD_ID);
536        assert!(received_snapshot.executable_regions.is_empty(), "all entries have been removed");
537    }
538
539    #[fasync::run_singlethreaded(test)]
540    async fn test_two_batches() {
541        let (receiver_proxy, receiver_stream) =
542            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
543        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
544
545        // Send a first batch.
546        let fut = receiver_proxy.batch(&[
547            fheapdump_client::SnapshotElement::ExecutableRegion(
548                fheapdump_client::ExecutableRegion {
549                    address: Some(FAKE_REGION_2_ADDRESS),
550                    size: Some(FAKE_REGION_2_SIZE),
551                    file_offset: Some(FAKE_REGION_2_FILE_OFFSET),
552                    build_id: Some(fheapdump_client::BuildId {
553                        value: FAKE_REGION_2_BUILD_ID.to_vec(),
554                    }),
555                    ..Default::default()
556                },
557            ),
558            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
559                address: Some(FAKE_ALLOCATION_1_ADDRESS),
560                size: Some(FAKE_ALLOCATION_1_SIZE),
561                thread_info_key: Some(FAKE_THREAD_1_KEY),
562                stack_trace_key: Some(FAKE_STACK_TRACE_2_KEY),
563                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
564                ..Default::default()
565            }),
566            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
567                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
568                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
569                ..Default::default()
570            }),
571            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
572                thread_info_key: Some(FAKE_THREAD_2_KEY),
573                koid: Some(FAKE_THREAD_2_KOID),
574                name: Some(FAKE_THREAD_2_NAME.to_string()),
575                ..Default::default()
576            }),
577        ]);
578        fut.await.unwrap();
579
580        // Send another batch.
581        let fut = receiver_proxy.batch(&[
582            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
583                thread_info_key: Some(FAKE_THREAD_1_KEY),
584                koid: Some(FAKE_THREAD_1_KOID),
585                name: Some(FAKE_THREAD_1_NAME.to_string()),
586                ..Default::default()
587            }),
588            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
589                address: Some(FAKE_ALLOCATION_2_ADDRESS),
590                size: Some(FAKE_ALLOCATION_2_SIZE),
591                thread_info_key: Some(FAKE_THREAD_2_KEY),
592                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
593                timestamp: Some(FAKE_ALLOCATION_2_TIMESTAMP),
594                ..Default::default()
595            }),
596            fheapdump_client::SnapshotElement::ExecutableRegion(
597                fheapdump_client::ExecutableRegion {
598                    address: Some(FAKE_REGION_1_ADDRESS),
599                    name: Some(FAKE_REGION_1_NAME.to_string()),
600                    size: Some(FAKE_REGION_1_SIZE),
601                    file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
602                    vaddr: Some(FAKE_REGION_1_VADDR),
603                    build_id: Some(fheapdump_client::BuildId {
604                        value: FAKE_REGION_1_BUILD_ID.to_vec(),
605                    }),
606                    ..Default::default()
607                },
608            ),
609            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
610                stack_trace_key: Some(FAKE_STACK_TRACE_2_KEY),
611                program_addresses: Some(FAKE_STACK_TRACE_2_ADDRESSES.to_vec()),
612                ..Default::default()
613            }),
614            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
615                address: Some(FAKE_ALLOCATION_1_ADDRESS),
616                contents: Some(FAKE_ALLOCATION_1_CONTENTS.to_vec()),
617                ..Default::default()
618            }),
619        ]);
620        fut.await.unwrap();
621
622        // Send the end of stream marker.
623        let fut = receiver_proxy.batch(&[]);
624        fut.await.unwrap();
625
626        // Receive the snapshot we just transmitted and verify its contents.
627        let mut received_snapshot = receive_worker.await.unwrap();
628        let allocation1 = received_snapshot.allocations.swap_remove(
629            received_snapshot
630                .allocations
631                .iter()
632                .position(|alloc| alloc.address == Some(FAKE_ALLOCATION_1_ADDRESS))
633                .unwrap(),
634        );
635        assert_eq!(allocation1.size, FAKE_ALLOCATION_1_SIZE);
636        assert_eq!(
637            allocation1.thread_info,
638            Some(Rc::new(ThreadInfo {
639                koid: FAKE_THREAD_1_KOID,
640                name: FAKE_THREAD_1_NAME.to_owned()
641            }))
642        );
643        assert_eq!(allocation1.stack_trace.program_addresses, FAKE_STACK_TRACE_2_ADDRESSES);
644        assert_eq!(allocation1.timestamp, Some(FAKE_ALLOCATION_1_TIMESTAMP));
645        assert_eq!(
646            allocation1.contents.as_ref().expect("contents must be set"),
647            &FAKE_ALLOCATION_1_CONTENTS.to_vec()
648        );
649        let allocation2 = received_snapshot.allocations.swap_remove(
650            received_snapshot
651                .allocations
652                .iter()
653                .position(|alloc| alloc.address == Some(FAKE_ALLOCATION_2_ADDRESS))
654                .unwrap(),
655        );
656        assert_eq!(allocation2.size, FAKE_ALLOCATION_2_SIZE);
657        assert_eq!(
658            allocation2.thread_info,
659            Some(Rc::new(ThreadInfo {
660                koid: FAKE_THREAD_2_KOID,
661                name: FAKE_THREAD_2_NAME.to_owned()
662            }))
663        );
664        assert_eq!(allocation2.stack_trace.program_addresses, FAKE_STACK_TRACE_1_ADDRESSES);
665        assert_eq!(allocation2.timestamp, Some(FAKE_ALLOCATION_2_TIMESTAMP));
666        assert_matches!(allocation2.contents, None, "no contents are sent for this allocation");
667        assert!(received_snapshot.allocations.is_empty(), "all the entries have been removed");
668        let region1 = received_snapshot.executable_regions.remove(&FAKE_REGION_1_ADDRESS).unwrap();
669        assert_eq!(region1.name, FAKE_REGION_1_NAME);
670        assert_eq!(region1.size, FAKE_REGION_1_SIZE);
671        assert_eq!(region1.file_offset, FAKE_REGION_1_FILE_OFFSET);
672        assert_eq!(region1.vaddr.unwrap(), FAKE_REGION_1_VADDR);
673        assert_eq!(region1.build_id, FAKE_REGION_1_BUILD_ID);
674        let region2 = received_snapshot.executable_regions.remove(&FAKE_REGION_2_ADDRESS).unwrap();
675        assert_eq!(region2.size, FAKE_REGION_2_SIZE);
676        assert_eq!(region2.file_offset, FAKE_REGION_2_FILE_OFFSET);
677        assert_eq!(region2.build_id, FAKE_REGION_2_BUILD_ID);
678        assert!(received_snapshot.executable_regions.is_empty(), "all entries have been removed");
679    }
680
681    #[test_case(|allocation| allocation.size = None => matches
682        Err(Error::MissingField { container: "Allocation", field: "size" }) ; "size")]
683    #[test_case(|allocation| allocation.stack_trace_key = None => matches
684        Err(Error::MissingField { container: "Allocation", field: "stack_trace_key" }) ; "stack_trace_key")]
685    #[test_case(|allocation| allocation.address = None => matches
686        Ok(_) ; "address_is_optional")]
687    #[test_case(|allocation| allocation.thread_info_key = None => matches
688        Ok(_) ; "thread_info_is_optional")]
689    #[test_case(|allocation| allocation.timestamp = None => matches
690        Ok(_) ; "timestamp_is_optional")]
691    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
692        Ok(_) ; "success")]
693    #[fasync::run_singlethreaded(test)]
694    async fn test_allocation_required_fields(
695        set_one_field_to_none: fn(&mut fheapdump_client::Allocation),
696    ) -> Result<Snapshot, Error> {
697        let (receiver_proxy, receiver_stream) =
698            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
699        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
700
701        // Start with an Allocation with all the required fields set.
702        let mut allocation = fheapdump_client::Allocation {
703            address: Some(FAKE_ALLOCATION_1_ADDRESS),
704            size: Some(FAKE_ALLOCATION_1_SIZE),
705            thread_info_key: Some(FAKE_THREAD_1_KEY),
706            stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
707            timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
708            ..Default::default()
709        };
710
711        // Set one of the fields to None, according to the case being tested.
712        set_one_field_to_none(&mut allocation);
713
714        // Send it to the SnapshotReceiver along with the thread info and stack trace it references.
715        let fut = receiver_proxy.batch(&[
716            fheapdump_client::SnapshotElement::Allocation(allocation),
717            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
718                thread_info_key: Some(FAKE_THREAD_1_KEY),
719                koid: Some(FAKE_THREAD_1_KOID),
720                name: Some(FAKE_THREAD_1_NAME.to_string()),
721                ..Default::default()
722            }),
723            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
724                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
725                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
726                ..Default::default()
727            }),
728        ]);
729        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
730
731        // Send the end of stream marker.
732        let fut = receiver_proxy.batch(&[]);
733        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
734
735        // Return the result.
736        receive_worker.await
737    }
738
739    #[test_case(|thread_info| thread_info.thread_info_key = None => matches
740        Err(Error::MissingField { container: "ThreadInfo", field: "thread_info_key" }) ; "thread_info_key")]
741    #[test_case(|thread_info| thread_info.koid = None => matches
742        Err(Error::MissingField { container: "ThreadInfo", field: "koid" }) ; "koid")]
743    #[test_case(|thread_info| thread_info.name = None => matches
744        Err(Error::MissingField { container: "ThreadInfo", field: "name" }) ; "name")]
745    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
746        Ok(_) ; "success")]
747    #[fasync::run_singlethreaded(test)]
748    async fn test_thread_info_required_fields(
749        set_one_field_to_none: fn(&mut fheapdump_client::ThreadInfo),
750    ) -> Result<Snapshot, Error> {
751        let (receiver_proxy, receiver_stream) =
752            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
753        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
754
755        // Start with a ThreadInfo with all the required fields set.
756        let mut thread_info = fheapdump_client::ThreadInfo {
757            thread_info_key: Some(FAKE_THREAD_1_KEY),
758            koid: Some(FAKE_THREAD_1_KOID),
759            name: Some(FAKE_THREAD_1_NAME.to_string()),
760            ..Default::default()
761        };
762
763        // Set one of the fields to None, according to the case being tested.
764        set_one_field_to_none(&mut thread_info);
765
766        // Send it to the SnapshotReceiver.
767        let fut =
768            receiver_proxy.batch(&[fheapdump_client::SnapshotElement::ThreadInfo(thread_info)]);
769        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
770
771        // Send the end of stream marker.
772        let fut = receiver_proxy.batch(&[]);
773        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
774
775        // Return the result.
776        receive_worker.await
777    }
778
779    #[test_case(|stack_trace| stack_trace.stack_trace_key = None => matches
780        Err(Error::MissingField { container: "StackTrace", field: "stack_trace_key" }) ; "stack_trace_key")]
781    #[test_case(|stack_trace| stack_trace.program_addresses = None => matches
782        Err(Error::MissingField { container: "StackTrace", field: "program_addresses" }) ; "program_addresses")]
783    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
784        Ok(_) ; "success")]
785    #[fasync::run_singlethreaded(test)]
786    async fn test_stack_trace_required_fields(
787        set_one_field_to_none: fn(&mut fheapdump_client::StackTrace),
788    ) -> Result<Snapshot, Error> {
789        let (receiver_proxy, receiver_stream) =
790            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
791        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
792
793        // Start with a StackTrace with all the required fields set.
794        let mut stack_trace = fheapdump_client::StackTrace {
795            stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
796            program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
797            ..Default::default()
798        };
799
800        // Set one of the fields to None, according to the case being tested.
801        set_one_field_to_none(&mut stack_trace);
802
803        // Send it to the SnapshotReceiver.
804        let fut =
805            receiver_proxy.batch(&[fheapdump_client::SnapshotElement::StackTrace(stack_trace)]);
806        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
807
808        // Send the end of stream marker.
809        let fut = receiver_proxy.batch(&[]);
810        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
811
812        // Return the result.
813        receive_worker.await
814    }
815
816    #[test_case(|region| region.address = None => matches
817        Err(Error::MissingField { container: "ExecutableRegion", field: "address" }) ; "address")]
818    #[test_case(|region| region.size = None => matches
819        Err(Error::MissingField { container: "ExecutableRegion", field: "size" }) ; "size")]
820    #[test_case(|region| region.file_offset = None => matches
821        Err(Error::MissingField { container: "ExecutableRegion", field: "file_offset" }) ; "file_offset")]
822    #[test_case(|region| region.build_id = None => matches
823        Err(Error::MissingField { container: "ExecutableRegion", field: "build_id" }) ; "build_id")]
824    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
825        Ok(_) ; "success")]
826    #[fasync::run_singlethreaded(test)]
827    async fn test_executable_region_required_fields(
828        set_one_field_to_none: fn(&mut fheapdump_client::ExecutableRegion),
829    ) -> Result<Snapshot, Error> {
830        let (receiver_proxy, receiver_stream) =
831            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
832        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
833
834        // Start with an ExecutableRegion with all the required fields set.
835        let mut region = fheapdump_client::ExecutableRegion {
836            address: Some(FAKE_REGION_1_ADDRESS),
837            size: Some(FAKE_REGION_1_SIZE),
838            file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
839            build_id: Some(fheapdump_client::BuildId { value: FAKE_REGION_1_BUILD_ID.to_vec() }),
840            ..Default::default()
841        };
842
843        // Set one of the fields to None, according to the case being tested.
844        set_one_field_to_none(&mut region);
845
846        // Send it to the SnapshotReceiver.
847        let fut =
848            receiver_proxy.batch(&[fheapdump_client::SnapshotElement::ExecutableRegion(region)]);
849        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
850
851        // Send the end of stream marker.
852        let fut = receiver_proxy.batch(&[]);
853        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
854
855        // Return the result.
856        receive_worker.await
857    }
858
859    #[test_case(|block_contents| block_contents.address = None => matches
860        Err(Error::MissingField { container: "BlockContents", field: "address" }) ; "address")]
861    #[test_case(|block_contents| block_contents.contents = None => matches
862        Err(Error::MissingField { container: "BlockContents", field: "contents" }) ; "contents")]
863    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
864        Ok(_) ; "success")]
865    #[fasync::run_singlethreaded(test)]
866    async fn test_block_contents_required_fields(
867        set_one_field_to_none: fn(&mut fheapdump_client::BlockContents),
868    ) -> Result<Snapshot, Error> {
869        let (receiver_proxy, receiver_stream) =
870            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
871        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
872
873        // Start with a BlockContents with all the required fields set.
874        let mut block_contents = fheapdump_client::BlockContents {
875            address: Some(FAKE_ALLOCATION_1_ADDRESS),
876            contents: Some(FAKE_ALLOCATION_1_CONTENTS.to_vec()),
877            ..Default::default()
878        };
879
880        // Set one of the fields to None, according to the case being tested.
881        set_one_field_to_none(&mut block_contents);
882
883        // Send it to the SnapshotReceiver along with the allocation it references.
884        let fut = receiver_proxy.batch(&[
885            fheapdump_client::SnapshotElement::BlockContents(block_contents),
886            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
887                address: Some(FAKE_ALLOCATION_1_ADDRESS),
888                size: Some(FAKE_ALLOCATION_1_SIZE),
889                thread_info_key: Some(FAKE_THREAD_1_KEY),
890                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
891                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
892                ..Default::default()
893            }),
894            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
895                thread_info_key: Some(FAKE_THREAD_1_KEY),
896                koid: Some(FAKE_THREAD_1_KOID),
897                name: Some(FAKE_THREAD_1_NAME.to_string()),
898                ..Default::default()
899            }),
900            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
901                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
902                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
903                ..Default::default()
904            }),
905        ]);
906        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
907
908        // Send the end of stream marker.
909        let fut = receiver_proxy.batch(&[]);
910        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
911
912        // Return the result.
913        receive_worker.await
914    }
915
916    #[fasync::run_singlethreaded(test)]
917    async fn test_conflicting_allocations() {
918        let (receiver_proxy, receiver_stream) =
919            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
920        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
921
922        // Send two allocations with the same address along with the stack trace they reference.
923        let fut = receiver_proxy.batch(&[
924            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
925                address: Some(FAKE_ALLOCATION_1_ADDRESS),
926                size: Some(FAKE_ALLOCATION_1_SIZE),
927                thread_info_key: Some(FAKE_THREAD_1_KEY),
928                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
929                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
930                ..Default::default()
931            }),
932            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
933                address: Some(FAKE_ALLOCATION_1_ADDRESS),
934                size: Some(FAKE_ALLOCATION_1_SIZE),
935                thread_info_key: Some(FAKE_THREAD_1_KEY),
936                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
937                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
938                ..Default::default()
939            }),
940            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
941                thread_info_key: Some(FAKE_THREAD_1_KEY),
942                koid: Some(FAKE_THREAD_1_KOID),
943                name: Some(FAKE_THREAD_1_NAME.to_string()),
944                ..Default::default()
945            }),
946            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
947                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
948                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
949                ..Default::default()
950            }),
951        ]);
952        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
953
954        // Send the end of stream marker.
955        let fut = receiver_proxy.batch(&[]);
956        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
957
958        // Verify expected error.
959        assert_matches!(
960            receive_worker.await,
961            Err(Error::ConflictingElement { element_type: "Allocation" })
962        );
963    }
964
965    #[fasync::run_singlethreaded(test)]
966    async fn test_conflicting_executable_regions() {
967        let (receiver_proxy, receiver_stream) =
968            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
969        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
970
971        // Send two executable regions with the same address.
972        let fut = receiver_proxy.batch(&[
973            fheapdump_client::SnapshotElement::ExecutableRegion(
974                fheapdump_client::ExecutableRegion {
975                    address: Some(FAKE_REGION_1_ADDRESS),
976                    size: Some(FAKE_REGION_1_SIZE),
977                    file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
978                    build_id: Some(fheapdump_client::BuildId {
979                        value: FAKE_REGION_1_BUILD_ID.to_vec(),
980                    }),
981                    ..Default::default()
982                },
983            ),
984            fheapdump_client::SnapshotElement::ExecutableRegion(
985                fheapdump_client::ExecutableRegion {
986                    address: Some(FAKE_REGION_1_ADDRESS),
987                    size: Some(FAKE_REGION_1_SIZE),
988                    file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
989                    build_id: Some(fheapdump_client::BuildId {
990                        value: FAKE_REGION_1_BUILD_ID.to_vec(),
991                    }),
992                    ..Default::default()
993                },
994            ),
995        ]);
996        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
997
998        // Send the end of stream marker.
999        let fut = receiver_proxy.batch(&[]);
1000        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
1001
1002        // Verify expected error.
1003        assert_matches!(
1004            receive_worker.await,
1005            Err(Error::ConflictingElement { element_type: "ExecutableRegion" })
1006        );
1007    }
1008
1009    #[fasync::run_singlethreaded(test)]
1010    async fn test_block_contents_wrong_size() {
1011        let (receiver_proxy, receiver_stream) =
1012            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1013        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
1014
1015        // Send an allocation whose BlockContents has the wrong size.
1016        let contents_with_wrong_size = vec![0; FAKE_ALLOCATION_1_SIZE as usize + 1];
1017        let fut = receiver_proxy.batch(&[
1018            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1019                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1020                size: Some(FAKE_ALLOCATION_1_SIZE),
1021                thread_info_key: Some(FAKE_THREAD_1_KEY),
1022                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1023                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1024                ..Default::default()
1025            }),
1026            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
1027                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1028                contents: Some(contents_with_wrong_size),
1029                ..Default::default()
1030            }),
1031            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1032                thread_info_key: Some(FAKE_THREAD_1_KEY),
1033                koid: Some(FAKE_THREAD_1_KOID),
1034                name: Some(FAKE_THREAD_1_NAME.to_string()),
1035                ..Default::default()
1036            }),
1037            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1038                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1039                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1040                ..Default::default()
1041            }),
1042        ]);
1043        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
1044
1045        // Send the end of stream marker.
1046        let fut = receiver_proxy.batch(&[]);
1047        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
1048
1049        // Verify expected error.
1050        assert_matches!(
1051            receive_worker.await,
1052            Err(Error::ConflictingElement { element_type: "BlockContents" })
1053        );
1054    }
1055
1056    #[fasync::run_singlethreaded(test)]
1057    async fn test_empty_stack_trace() {
1058        let (receiver_proxy, receiver_stream) =
1059            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1060        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
1061
1062        // Send an allocation that references an empty stack trace.
1063        let fut = receiver_proxy.batch(&[
1064            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1065                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1066                size: Some(FAKE_ALLOCATION_1_SIZE),
1067                thread_info_key: Some(FAKE_THREAD_1_KEY),
1068                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1069                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1070                ..Default::default()
1071            }),
1072            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1073                thread_info_key: Some(FAKE_THREAD_1_KEY),
1074                koid: Some(FAKE_THREAD_1_KOID),
1075                name: Some(FAKE_THREAD_1_NAME.to_string()),
1076                ..Default::default()
1077            }),
1078            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1079                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1080                program_addresses: Some(vec![]),
1081                ..Default::default()
1082            }),
1083        ]);
1084        fut.await.unwrap();
1085
1086        // Send the end of stream marker.
1087        let fut = receiver_proxy.batch(&[]);
1088        fut.await.unwrap();
1089
1090        // Verify that the stack trace has been reconstructed correctly.
1091        let received_snapshot = receive_worker.await.unwrap();
1092        let allocation1 = received_snapshot
1093            .allocations
1094            .iter()
1095            .find(|a| a.address == Some(FAKE_ALLOCATION_1_ADDRESS))
1096            .unwrap();
1097        assert_eq!(allocation1.stack_trace.program_addresses, []);
1098    }
1099
1100    #[fasync::run_singlethreaded(test)]
1101    async fn test_chunked_stack_trace() {
1102        let (receiver_proxy, receiver_stream) =
1103            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1104        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
1105
1106        // Send an allocation and the first chunk of its stack trace.
1107        let fut = receiver_proxy.batch(&[
1108            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1109                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1110                size: Some(FAKE_ALLOCATION_1_SIZE),
1111                thread_info_key: Some(FAKE_THREAD_1_KEY),
1112                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1113                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1114                ..Default::default()
1115            }),
1116            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1117                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1118                program_addresses: Some(vec![1111, 2222]),
1119                ..Default::default()
1120            }),
1121            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1122                thread_info_key: Some(FAKE_THREAD_1_KEY),
1123                koid: Some(FAKE_THREAD_1_KOID),
1124                name: Some(FAKE_THREAD_1_NAME.to_string()),
1125                ..Default::default()
1126            }),
1127        ]);
1128        fut.await.unwrap();
1129
1130        // Send the second chunk.
1131        let fut = receiver_proxy.batch(&[fheapdump_client::SnapshotElement::StackTrace(
1132            fheapdump_client::StackTrace {
1133                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1134                program_addresses: Some(vec![3333]),
1135                ..Default::default()
1136            },
1137        )]);
1138        fut.await.unwrap();
1139
1140        // Send the end of stream marker.
1141        let fut = receiver_proxy.batch(&[]);
1142        fut.await.unwrap();
1143
1144        // Verify that the stack trace has been reconstructed correctly.
1145        let received_snapshot = receive_worker.await.unwrap();
1146        let allocation1 = received_snapshot
1147            .allocations
1148            .iter()
1149            .find(|alloc| alloc.address == Some(FAKE_ALLOCATION_1_ADDRESS))
1150            .unwrap();
1151        assert_eq!(allocation1.stack_trace.program_addresses, [1111, 2222, 3333]);
1152    }
1153
1154    #[fasync::run_singlethreaded(test)]
1155    async fn test_empty_block_contents() {
1156        let (receiver_proxy, receiver_stream) =
1157            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1158        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
1159
1160        // Send a zero-sized allocation and its empty contents.
1161        let fut = receiver_proxy.batch(&[
1162            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1163                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1164                size: Some(0),
1165                thread_info_key: Some(FAKE_THREAD_1_KEY),
1166                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1167                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1168                ..Default::default()
1169            }),
1170            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1171                thread_info_key: Some(FAKE_THREAD_1_KEY),
1172                koid: Some(FAKE_THREAD_1_KOID),
1173                name: Some(FAKE_THREAD_1_NAME.to_string()),
1174                ..Default::default()
1175            }),
1176            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1177                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1178                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1179                ..Default::default()
1180            }),
1181            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
1182                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1183                contents: Some(vec![]),
1184                ..Default::default()
1185            }),
1186        ]);
1187        fut.await.unwrap();
1188
1189        // Send the end of stream marker.
1190        let fut = receiver_proxy.batch(&[]);
1191        fut.await.unwrap();
1192
1193        // Verify that the allocation has been reconstructed correctly.
1194        let received_snapshot = receive_worker.await.unwrap();
1195        let allocation1 = received_snapshot
1196            .allocations
1197            .iter()
1198            .find(|alloc| alloc.address == Some(FAKE_ALLOCATION_1_ADDRESS))
1199            .unwrap();
1200        assert_eq!(allocation1.contents.as_ref().expect("contents must be set"), &vec![]);
1201    }
1202
1203    #[fasync::run_singlethreaded(test)]
1204    async fn test_chunked_block_contents() {
1205        let (receiver_proxy, receiver_stream) =
1206            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1207        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
1208
1209        // Split the contents in two halves.
1210        let (content_first_chunk, contents_second_chunk) =
1211            FAKE_ALLOCATION_1_CONTENTS.split_at(FAKE_ALLOCATION_1_CONTENTS.len() / 2);
1212
1213        // Send an allocation and the first chunk of its contents.
1214        let fut = receiver_proxy.batch(&[
1215            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1216                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1217                size: Some(FAKE_ALLOCATION_1_SIZE),
1218                thread_info_key: Some(FAKE_THREAD_1_KEY),
1219                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1220                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1221                ..Default::default()
1222            }),
1223            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1224                thread_info_key: Some(FAKE_THREAD_1_KEY),
1225                koid: Some(FAKE_THREAD_1_KOID),
1226                name: Some(FAKE_THREAD_1_NAME.to_string()),
1227                ..Default::default()
1228            }),
1229            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1230                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1231                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1232                ..Default::default()
1233            }),
1234            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
1235                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1236                contents: Some(content_first_chunk.to_vec()),
1237                ..Default::default()
1238            }),
1239        ]);
1240        fut.await.unwrap();
1241
1242        // Send the second chunk.
1243        let fut = receiver_proxy.batch(&[fheapdump_client::SnapshotElement::BlockContents(
1244            fheapdump_client::BlockContents {
1245                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1246                contents: Some(contents_second_chunk.to_vec()),
1247                ..Default::default()
1248            },
1249        )]);
1250        fut.await.unwrap();
1251
1252        // Send the end of stream marker.
1253        let fut = receiver_proxy.batch(&[]);
1254        fut.await.unwrap();
1255
1256        // Verify that the allocation's block contents have been reconstructed correctly.
1257        let received_snapshot = receive_worker.await.unwrap();
1258        let allocation1 = received_snapshot
1259            .allocations
1260            .iter()
1261            .find(|a| a.address == Some(FAKE_ALLOCATION_1_ADDRESS))
1262            .unwrap();
1263        assert_eq!(allocation1.contents, Some(FAKE_ALLOCATION_1_CONTENTS.to_vec()));
1264    }
1265
1266    #[fasync::run_singlethreaded(test)]
1267    async fn test_missing_end_of_stream() {
1268        let (receiver_proxy, receiver_stream) =
1269            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1270        let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
1271
1272        // Send an allocation and its stack trace.
1273        let fut = receiver_proxy.batch(&[
1274            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1275                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1276                size: Some(FAKE_ALLOCATION_1_SIZE),
1277                thread_info_key: Some(FAKE_THREAD_1_KEY),
1278                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1279                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1280                ..Default::default()
1281            }),
1282            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1283                thread_info_key: Some(FAKE_THREAD_1_KEY),
1284                koid: Some(FAKE_THREAD_1_KOID),
1285                name: Some(FAKE_THREAD_1_NAME.to_string()),
1286                ..Default::default()
1287            }),
1288            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1289                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1290                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1291                ..Default::default()
1292            }),
1293        ]);
1294        fut.await.unwrap();
1295
1296        // Close the channel without sending an end of stream marker.
1297        std::mem::drop(receiver_proxy);
1298
1299        // Expect an UnexpectedEndOfStream error.
1300        assert_matches!(receive_worker.await, Err(Error::UnexpectedEndOfStream));
1301    }
1302
1303    #[fasync::run_singlethreaded(test)]
1304    async fn test_multi_contents() {
1305        let (receiver_proxy, receiver_stream) =
1306            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1307        let receive_worker = fasync::Task::local(Snapshot::receive_multi_from(receiver_stream));
1308
1309        // Send two snapshots with different KOIDs.
1310        for koid in [1111, 2222] {
1311            receiver_proxy
1312                .batch(&[fheapdump_client::SnapshotElement::SnapshotHeader(
1313                    fheapdump_client::SnapshotHeader {
1314                        process_name: Some(format!("test-process-{koid}")),
1315                        process_koid: Some(koid),
1316                        ..Default::default()
1317                    },
1318                )])
1319                .await
1320                .unwrap();
1321
1322            receiver_proxy
1323                .batch(&[
1324                    fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1325                        address: Some(FAKE_ALLOCATION_1_ADDRESS),
1326                        size: Some(FAKE_ALLOCATION_1_SIZE),
1327                        thread_info_key: Some(FAKE_THREAD_1_KEY),
1328                        stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1329                        timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1330                        ..Default::default()
1331                    }),
1332                    fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1333                        thread_info_key: Some(FAKE_THREAD_1_KEY),
1334                        koid: Some(FAKE_THREAD_1_KOID),
1335                        name: Some(FAKE_THREAD_1_NAME.to_string()),
1336                        ..Default::default()
1337                    }),
1338                    fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1339                        stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1340                        program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1341                        ..Default::default()
1342                    }),
1343                ])
1344                .await
1345                .unwrap();
1346
1347            receiver_proxy.batch(&[]).await.unwrap(); // end of snapshot
1348        }
1349
1350        // Send end of stream marker.
1351        receiver_proxy.batch(&[]).await.unwrap();
1352
1353        // Validate the received snapshots.
1354        let received_snapshots = receive_worker.await.unwrap();
1355        assert_eq!(received_snapshots.len(), 2);
1356        assert_eq!(received_snapshots[0].process_name, "test-process-1111");
1357        assert_eq!(received_snapshots[0].process_koid, 1111);
1358        assert_eq!(received_snapshots[1].process_name, "test-process-2222");
1359        assert_eq!(received_snapshots[1].process_koid, 2222);
1360    }
1361
1362    #[fasync::run_singlethreaded(test)]
1363    async fn test_multi_missing_end_of_stream() {
1364        let (receiver_proxy, receiver_stream) =
1365            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1366        let receive_worker = fasync::Task::local(Snapshot::receive_multi_from(receiver_stream));
1367
1368        // Send two snapshots with different KOIDs.
1369        for koid in [1111, 2222] {
1370            receiver_proxy
1371                .batch(&[fheapdump_client::SnapshotElement::SnapshotHeader(
1372                    fheapdump_client::SnapshotHeader {
1373                        process_name: Some("test-process-name".to_string()),
1374                        process_koid: Some(koid),
1375                        ..Default::default()
1376                    },
1377                )])
1378                .await
1379                .unwrap();
1380
1381            receiver_proxy
1382                .batch(&[
1383                    fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1384                        address: Some(FAKE_ALLOCATION_1_ADDRESS),
1385                        size: Some(FAKE_ALLOCATION_1_SIZE),
1386                        thread_info_key: Some(FAKE_THREAD_1_KEY),
1387                        stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1388                        timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1389                        ..Default::default()
1390                    }),
1391                    fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1392                        thread_info_key: Some(FAKE_THREAD_1_KEY),
1393                        koid: Some(FAKE_THREAD_1_KOID),
1394                        name: Some(FAKE_THREAD_1_NAME.to_string()),
1395                        ..Default::default()
1396                    }),
1397                    fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1398                        stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1399                        program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1400                        ..Default::default()
1401                    }),
1402                ])
1403                .await
1404                .unwrap();
1405
1406            receiver_proxy.batch(&[]).await.unwrap(); // end of snapshot
1407        }
1408
1409        // Close the channel without sending an end of stream marker.
1410        std::mem::drop(receiver_proxy);
1411
1412        // Expect an UnexpectedEndOfStream error.
1413        assert_matches!(receive_worker.await, Err(Error::UnexpectedEndOfStream));
1414    }
1415}