heapdump_snapshot/
snapshot.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_fuchsia_memory_heapdump_client as fheapdump_client;
6use futures::StreamExt;
7use std::collections::HashMap;
8use std::rc::Rc;
9
10use crate::Error;
11
12/// Contains all the data received over a `SnapshotReceiver` channel.
13#[derive(Debug)]
14pub struct Snapshot {
15    /// All the live allocations in the analyzed process, indexed by memory address.
16    pub allocations: HashMap<u64, Allocation>,
17
18    /// All the executable memory regions in the analyzed process, indexed by start address.
19    pub executable_regions: HashMap<u64, ExecutableRegion>,
20}
21
22/// Information about an allocated memory block and, optionally, its contents.
23#[derive(Debug)]
24pub struct Allocation {
25    /// Block size, in bytes.
26    pub size: u64,
27
28    /// The allocating thread.
29    pub thread_info: Rc<ThreadInfo>,
30
31    /// The stack trace of the allocation site.
32    pub stack_trace: Rc<StackTrace>,
33
34    /// Allocation timestamp, in nanoseconds.
35    pub timestamp: fidl::MonotonicInstant,
36
37    /// Memory dump of this block's contents.
38    pub contents: Option<Vec<u8>>,
39}
40
41/// A stack trace.
42#[derive(Debug)]
43pub struct StackTrace {
44    /// Code addresses at each call frame. The first entry corresponds to the leaf call.
45    pub program_addresses: Vec<u64>,
46}
47
48/// A memory region containing code loaded from an ELF file.
49#[derive(Debug)]
50pub struct ExecutableRegion {
51    /// Region size, in bytes.
52    pub size: u64,
53
54    /// The corresponding offset in the ELF file.
55    pub file_offset: u64,
56
57    /// The Build ID of the ELF file.
58    pub build_id: Vec<u8>,
59}
60
61/// Information identifying a specific thread.
62#[derive(Debug)]
63pub struct ThreadInfo {
64    /// The thread's koid.
65    pub koid: zx_types::zx_koid_t,
66
67    /// The thread's name.
68    pub name: String,
69}
70
71/// Gets the value of a field in a FIDL table as a `Result<T, Error>`.
72///
73/// An `Err(Error::MissingField { .. })` is returned if the field's value is `None`.
74///
75/// Usage: `read_field!(container_expression => ContainerType, field_name)`
76///
77/// # Example
78///
79/// ```
80/// struct MyFidlTable { field: Option<u32>, .. }
81/// let table = MyFidlTable { field: Some(44), .. };
82///
83/// let val = read_field!(table => MyFidlTable, field)?;
84/// ```
85macro_rules! read_field {
86    ($e:expr => $c:ident, $f:ident) => {
87        $e.$f.ok_or(Error::MissingField {
88            container: std::stringify!($c),
89            field: std::stringify!($f),
90        })
91    };
92}
93
94impl Snapshot {
95    /// Receives data over a `SnapshotReceiver` channel and reassembles it.
96    pub async fn receive_from(
97        mut stream: fheapdump_client::SnapshotReceiverRequestStream,
98    ) -> Result<Snapshot, Error> {
99        let mut allocations: HashMap<u64, (u64, u64, u64, fidl::MonotonicInstant)> = HashMap::new();
100        let mut thread_infos: HashMap<u64, Rc<ThreadInfo>> = HashMap::new();
101        let mut stack_traces: HashMap<u64, Vec<u64>> = HashMap::new();
102        let mut executable_regions: HashMap<u64, ExecutableRegion> = HashMap::new();
103        let mut contents: HashMap<u64, Vec<u8>> = HashMap::new();
104
105        loop {
106            // Wait for the next batch of elements.
107            let batch = match stream.next().await.transpose()? {
108                Some(fheapdump_client::SnapshotReceiverRequest::Batch { batch, responder }) => {
109                    // Send acknowledgment as quickly as possible, then keep processing the received batch.
110                    responder.send()?;
111                    batch
112                }
113                Some(fheapdump_client::SnapshotReceiverRequest::ReportError {
114                    error,
115                    responder,
116                }) => {
117                    let _ = responder.send(); // Ignore the result of the acknowledgment.
118                    return Err(Error::CollectorError(error));
119                }
120                None => return Err(Error::UnexpectedEndOfStream),
121            };
122
123            // Process data. An empty batch signals the end of the stream.
124            if !batch.is_empty() {
125                for element in batch {
126                    match element {
127                        fheapdump_client::SnapshotElement::Allocation(allocation) => {
128                            let address = read_field!(allocation => Allocation, address)?;
129                            let size = read_field!(allocation => Allocation, size)?;
130                            let timestamp = read_field!(allocation => Allocation, timestamp)?;
131                            let thread_info_key =
132                                read_field!(allocation => Allocation, thread_info_key)?;
133                            let stack_trace_key =
134                                read_field!(allocation => Allocation, stack_trace_key)?;
135                            if allocations
136                                .insert(
137                                    address,
138                                    (size, thread_info_key, stack_trace_key, timestamp),
139                                )
140                                .is_some()
141                            {
142                                return Err(Error::ConflictingElement {
143                                    element_type: "Allocation",
144                                });
145                            }
146                        }
147                        fheapdump_client::SnapshotElement::StackTrace(stack_trace) => {
148                            let stack_trace_key =
149                                read_field!(stack_trace => StackTrace, stack_trace_key)?;
150                            let mut program_addresses =
151                                read_field!(stack_trace => StackTrace, program_addresses)?;
152                            stack_traces
153                                .entry(stack_trace_key)
154                                .or_default()
155                                .append(&mut program_addresses);
156                        }
157                        fheapdump_client::SnapshotElement::ThreadInfo(thread_info) => {
158                            let thread_info_key =
159                                read_field!(thread_info => ThreadInfo, thread_info_key)?;
160                            let koid = read_field!(thread_info => ThreadInfo, koid)?;
161                            let name = read_field!(thread_info => ThreadInfo, name)?;
162                            if thread_infos
163                                .insert(thread_info_key, Rc::new(ThreadInfo { koid, name }))
164                                .is_some()
165                            {
166                                return Err(Error::ConflictingElement {
167                                    element_type: "ThreadInfo",
168                                });
169                            }
170                        }
171                        fheapdump_client::SnapshotElement::ExecutableRegion(region) => {
172                            let address = read_field!(region => ExecutableRegion, address)?;
173                            let size = read_field!(region => ExecutableRegion, size)?;
174                            let file_offset = read_field!(region => ExecutableRegion, file_offset)?;
175                            let build_id = read_field!(region => ExecutableRegion, build_id)?.value;
176                            let region = ExecutableRegion { size, file_offset, build_id };
177                            if executable_regions.insert(address, region).is_some() {
178                                return Err(Error::ConflictingElement {
179                                    element_type: "ExecutableRegion",
180                                });
181                            }
182                        }
183                        fheapdump_client::SnapshotElement::BlockContents(block_contents) => {
184                            let address = read_field!(block_contents => BlockContents, address)?;
185                            let mut chunk = read_field!(block_contents => BlockContents, contents)?;
186                            contents.entry(address).or_default().append(&mut chunk);
187                        }
188                        _ => return Err(Error::UnexpectedElementType),
189                    }
190                }
191            } else {
192                // We are at the end of the stream. Convert to the final types and resolve
193                // cross-references.
194                let final_stack_traces: HashMap<u64, Rc<StackTrace>> = stack_traces
195                    .into_iter()
196                    .map(|(key, program_addresses)| {
197                        (key, Rc::new(StackTrace { program_addresses }))
198                    })
199                    .collect();
200                let mut final_allocations = HashMap::new();
201                for (address, (size, thread_info_key, stack_trace_key, timestamp)) in allocations {
202                    let thread_info = thread_infos
203                        .get(&thread_info_key)
204                        .ok_or(Error::InvalidCrossReference { element_type: "ThreadInfo" })?
205                        .clone();
206                    let stack_trace = final_stack_traces
207                        .get(&stack_trace_key)
208                        .ok_or(Error::InvalidCrossReference { element_type: "StackTrace" })?
209                        .clone();
210                    let contents = match contents.remove(&address) {
211                        Some(data) if data.len() as u64 != size => {
212                            return Err(Error::ConflictingElement { element_type: "BlockContents" })
213                        }
214                        other => other,
215                    };
216                    final_allocations.insert(
217                        address,
218                        Allocation { size, thread_info, stack_trace, timestamp, contents },
219                    );
220                }
221
222                return Ok(Snapshot { allocations: final_allocations, executable_regions });
223            }
224        }
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231    use assert_matches::assert_matches;
232    use fidl::endpoints::create_proxy_and_stream;
233    use fuchsia_async as fasync;
234    use test_case::test_case;
235
236    // Constants used by some of the tests below:
237    const FAKE_ALLOCATION_1_ADDRESS: u64 = 1234;
238    const FAKE_ALLOCATION_1_SIZE: u64 = 8;
239    const FAKE_ALLOCATION_1_TIMESTAMP: fidl::MonotonicInstant =
240        fidl::MonotonicInstant::from_nanos(888888888);
241    const FAKE_ALLOCATION_1_CONTENTS: [u8; FAKE_ALLOCATION_1_SIZE as usize] = *b"12345678";
242    const FAKE_ALLOCATION_2_ADDRESS: u64 = 5678;
243    const FAKE_ALLOCATION_2_SIZE: u64 = 4;
244    const FAKE_ALLOCATION_2_TIMESTAMP: fidl::MonotonicInstant =
245        fidl::MonotonicInstant::from_nanos(-777777777); // test negative value too
246    const FAKE_THREAD_1_KOID: u64 = 1212;
247    const FAKE_THREAD_1_NAME: &str = "fake-thread-1-name";
248    const FAKE_THREAD_1_KEY: u64 = 4567;
249    const FAKE_THREAD_2_KOID: u64 = 1213;
250    const FAKE_THREAD_2_NAME: &str = "fake-thread-2-name";
251    const FAKE_THREAD_2_KEY: u64 = 7654;
252    const FAKE_STACK_TRACE_1_ADDRESSES: [u64; 6] = [11111, 22222, 33333, 22222, 44444, 55555];
253    const FAKE_STACK_TRACE_1_KEY: u64 = 9876;
254    const FAKE_STACK_TRACE_2_ADDRESSES: [u64; 4] = [11111, 22222, 11111, 66666];
255    const FAKE_STACK_TRACE_2_KEY: u64 = 6789;
256    const FAKE_REGION_1_ADDRESS: u64 = 0x10000000;
257    const FAKE_REGION_1_SIZE: u64 = 0x80000;
258    const FAKE_REGION_1_FILE_OFFSET: u64 = 0x1000;
259    const FAKE_REGION_1_BUILD_ID: &[u8] = &[0xaa; 20];
260    const FAKE_REGION_2_ADDRESS: u64 = 0x7654300000;
261    const FAKE_REGION_2_SIZE: u64 = 0x200000;
262    const FAKE_REGION_2_FILE_OFFSET: u64 = 0x2000;
263    const FAKE_REGION_2_BUILD_ID: &[u8] = &[0x55; 32];
264
265    #[fasync::run_singlethreaded(test)]
266    async fn test_empty() {
267        let (receiver_proxy, receiver_stream) =
268            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
269        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
270
271        // Send the end of stream marker.
272        let fut = receiver_proxy.batch(&[]);
273        fut.await.unwrap();
274
275        // Receive the snapshot we just transmitted and verify that it is empty.
276        let received_snapshot = receive_worker.await.unwrap();
277        assert!(received_snapshot.allocations.is_empty());
278        assert!(received_snapshot.executable_regions.is_empty());
279    }
280
281    #[fasync::run_singlethreaded(test)]
282    async fn test_one_batch() {
283        let (receiver_proxy, receiver_stream) =
284            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
285        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
286
287        // Send a batch containing two allocations - whose threads, stack traces and contents can be
288        // listed before or after the allocation(s) that reference them - and two executable
289        // regions.
290        let fut = receiver_proxy.batch(&[
291            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
292                address: Some(FAKE_ALLOCATION_1_ADDRESS),
293                contents: Some(FAKE_ALLOCATION_1_CONTENTS.to_vec()),
294                ..Default::default()
295            }),
296            fheapdump_client::SnapshotElement::ExecutableRegion(
297                fheapdump_client::ExecutableRegion {
298                    address: Some(FAKE_REGION_1_ADDRESS),
299                    size: Some(FAKE_REGION_1_SIZE),
300                    file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
301                    build_id: Some(fheapdump_client::BuildId {
302                        value: FAKE_REGION_1_BUILD_ID.to_vec(),
303                    }),
304                    ..Default::default()
305                },
306            ),
307            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
308                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
309                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
310                ..Default::default()
311            }),
312            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
313                address: Some(FAKE_ALLOCATION_1_ADDRESS),
314                size: Some(FAKE_ALLOCATION_1_SIZE),
315                thread_info_key: Some(FAKE_THREAD_1_KEY),
316                stack_trace_key: Some(FAKE_STACK_TRACE_2_KEY),
317                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
318                ..Default::default()
319            }),
320            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
321                thread_info_key: Some(FAKE_THREAD_1_KEY),
322                koid: Some(FAKE_THREAD_1_KOID),
323                name: Some(FAKE_THREAD_1_NAME.to_string()),
324                ..Default::default()
325            }),
326            fheapdump_client::SnapshotElement::ExecutableRegion(
327                fheapdump_client::ExecutableRegion {
328                    address: Some(FAKE_REGION_2_ADDRESS),
329                    size: Some(FAKE_REGION_2_SIZE),
330                    file_offset: Some(FAKE_REGION_2_FILE_OFFSET),
331                    build_id: Some(fheapdump_client::BuildId {
332                        value: FAKE_REGION_2_BUILD_ID.to_vec(),
333                    }),
334                    ..Default::default()
335                },
336            ),
337            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
338                thread_info_key: Some(FAKE_THREAD_2_KEY),
339                koid: Some(FAKE_THREAD_2_KOID),
340                name: Some(FAKE_THREAD_2_NAME.to_string()),
341                ..Default::default()
342            }),
343            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
344                address: Some(FAKE_ALLOCATION_2_ADDRESS),
345                size: Some(FAKE_ALLOCATION_2_SIZE),
346                thread_info_key: Some(FAKE_THREAD_2_KEY),
347                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
348                timestamp: Some(FAKE_ALLOCATION_2_TIMESTAMP),
349                ..Default::default()
350            }),
351            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
352                stack_trace_key: Some(FAKE_STACK_TRACE_2_KEY),
353                program_addresses: Some(FAKE_STACK_TRACE_2_ADDRESSES.to_vec()),
354                ..Default::default()
355            }),
356        ]);
357        fut.await.unwrap();
358
359        // Send the end of stream marker.
360        let fut = receiver_proxy.batch(&[]);
361        fut.await.unwrap();
362
363        // Receive the snapshot we just transmitted and verify its contents.
364        let mut received_snapshot = receive_worker.await.unwrap();
365        let allocation1 = received_snapshot.allocations.remove(&FAKE_ALLOCATION_1_ADDRESS).unwrap();
366        assert_eq!(allocation1.size, FAKE_ALLOCATION_1_SIZE);
367        assert_eq!(allocation1.thread_info.koid, FAKE_THREAD_1_KOID);
368        assert_eq!(allocation1.thread_info.name, FAKE_THREAD_1_NAME);
369        assert_eq!(allocation1.stack_trace.program_addresses, FAKE_STACK_TRACE_2_ADDRESSES);
370        assert_eq!(allocation1.timestamp, FAKE_ALLOCATION_1_TIMESTAMP);
371        assert_eq!(allocation1.contents.expect("contents must be set"), FAKE_ALLOCATION_1_CONTENTS);
372        let allocation2 = received_snapshot.allocations.remove(&FAKE_ALLOCATION_2_ADDRESS).unwrap();
373        assert_eq!(allocation2.size, FAKE_ALLOCATION_2_SIZE);
374        assert_eq!(allocation2.thread_info.koid, FAKE_THREAD_2_KOID);
375        assert_eq!(allocation2.thread_info.name, FAKE_THREAD_2_NAME);
376        assert_eq!(allocation2.stack_trace.program_addresses, FAKE_STACK_TRACE_1_ADDRESSES);
377        assert_eq!(allocation2.timestamp, FAKE_ALLOCATION_2_TIMESTAMP);
378        assert_matches!(allocation2.contents, None, "no contents are sent for this allocation");
379        assert!(received_snapshot.allocations.is_empty(), "all the entries have been removed");
380        let region1 = received_snapshot.executable_regions.remove(&FAKE_REGION_1_ADDRESS).unwrap();
381        assert_eq!(region1.size, FAKE_REGION_1_SIZE);
382        assert_eq!(region1.file_offset, FAKE_REGION_1_FILE_OFFSET);
383        assert_eq!(region1.build_id, FAKE_REGION_1_BUILD_ID);
384        let region2 = received_snapshot.executable_regions.remove(&FAKE_REGION_2_ADDRESS).unwrap();
385        assert_eq!(region2.size, FAKE_REGION_2_SIZE);
386        assert_eq!(region2.file_offset, FAKE_REGION_2_FILE_OFFSET);
387        assert_eq!(region2.build_id, FAKE_REGION_2_BUILD_ID);
388        assert!(received_snapshot.executable_regions.is_empty(), "all entries have been removed");
389    }
390
391    #[fasync::run_singlethreaded(test)]
392    async fn test_two_batches() {
393        let (receiver_proxy, receiver_stream) =
394            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
395        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
396
397        // Send a first batch.
398        let fut = receiver_proxy.batch(&[
399            fheapdump_client::SnapshotElement::ExecutableRegion(
400                fheapdump_client::ExecutableRegion {
401                    address: Some(FAKE_REGION_2_ADDRESS),
402                    size: Some(FAKE_REGION_2_SIZE),
403                    file_offset: Some(FAKE_REGION_2_FILE_OFFSET),
404                    build_id: Some(fheapdump_client::BuildId {
405                        value: FAKE_REGION_2_BUILD_ID.to_vec(),
406                    }),
407                    ..Default::default()
408                },
409            ),
410            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
411                address: Some(FAKE_ALLOCATION_1_ADDRESS),
412                size: Some(FAKE_ALLOCATION_1_SIZE),
413                thread_info_key: Some(FAKE_THREAD_1_KEY),
414                stack_trace_key: Some(FAKE_STACK_TRACE_2_KEY),
415                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
416                ..Default::default()
417            }),
418            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
419                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
420                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
421                ..Default::default()
422            }),
423            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
424                thread_info_key: Some(FAKE_THREAD_2_KEY),
425                koid: Some(FAKE_THREAD_2_KOID),
426                name: Some(FAKE_THREAD_2_NAME.to_string()),
427                ..Default::default()
428            }),
429        ]);
430        fut.await.unwrap();
431
432        // Send another batch.
433        let fut = receiver_proxy.batch(&[
434            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
435                thread_info_key: Some(FAKE_THREAD_1_KEY),
436                koid: Some(FAKE_THREAD_1_KOID),
437                name: Some(FAKE_THREAD_1_NAME.to_string()),
438                ..Default::default()
439            }),
440            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
441                address: Some(FAKE_ALLOCATION_2_ADDRESS),
442                size: Some(FAKE_ALLOCATION_2_SIZE),
443                thread_info_key: Some(FAKE_THREAD_2_KEY),
444                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
445                timestamp: Some(FAKE_ALLOCATION_2_TIMESTAMP),
446                ..Default::default()
447            }),
448            fheapdump_client::SnapshotElement::ExecutableRegion(
449                fheapdump_client::ExecutableRegion {
450                    address: Some(FAKE_REGION_1_ADDRESS),
451                    size: Some(FAKE_REGION_1_SIZE),
452                    file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
453                    build_id: Some(fheapdump_client::BuildId {
454                        value: FAKE_REGION_1_BUILD_ID.to_vec(),
455                    }),
456                    ..Default::default()
457                },
458            ),
459            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
460                stack_trace_key: Some(FAKE_STACK_TRACE_2_KEY),
461                program_addresses: Some(FAKE_STACK_TRACE_2_ADDRESSES.to_vec()),
462                ..Default::default()
463            }),
464            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
465                address: Some(FAKE_ALLOCATION_1_ADDRESS),
466                contents: Some(FAKE_ALLOCATION_1_CONTENTS.to_vec()),
467                ..Default::default()
468            }),
469        ]);
470        fut.await.unwrap();
471
472        // Send the end of stream marker.
473        let fut = receiver_proxy.batch(&[]);
474        fut.await.unwrap();
475
476        // Receive the snapshot we just transmitted and verify its contents.
477        let mut received_snapshot = receive_worker.await.unwrap();
478        let allocation1 = received_snapshot.allocations.remove(&FAKE_ALLOCATION_1_ADDRESS).unwrap();
479        assert_eq!(allocation1.size, FAKE_ALLOCATION_1_SIZE);
480        assert_eq!(allocation1.thread_info.koid, FAKE_THREAD_1_KOID);
481        assert_eq!(allocation1.thread_info.name, FAKE_THREAD_1_NAME);
482        assert_eq!(allocation1.stack_trace.program_addresses, FAKE_STACK_TRACE_2_ADDRESSES);
483        assert_eq!(allocation1.timestamp, FAKE_ALLOCATION_1_TIMESTAMP);
484        assert_eq!(allocation1.contents.expect("contents must be set"), FAKE_ALLOCATION_1_CONTENTS);
485        let allocation2 = received_snapshot.allocations.remove(&FAKE_ALLOCATION_2_ADDRESS).unwrap();
486        assert_eq!(allocation2.size, FAKE_ALLOCATION_2_SIZE);
487        assert_eq!(allocation2.thread_info.koid, FAKE_THREAD_2_KOID);
488        assert_eq!(allocation2.thread_info.name, FAKE_THREAD_2_NAME);
489        assert_eq!(allocation2.stack_trace.program_addresses, FAKE_STACK_TRACE_1_ADDRESSES);
490        assert_eq!(allocation2.timestamp, FAKE_ALLOCATION_2_TIMESTAMP);
491        assert_matches!(allocation2.contents, None, "no contents are sent for this allocation");
492        assert!(received_snapshot.allocations.is_empty(), "all the entries have been removed");
493        let region1 = received_snapshot.executable_regions.remove(&FAKE_REGION_1_ADDRESS).unwrap();
494        assert_eq!(region1.size, FAKE_REGION_1_SIZE);
495        assert_eq!(region1.file_offset, FAKE_REGION_1_FILE_OFFSET);
496        assert_eq!(region1.build_id, FAKE_REGION_1_BUILD_ID);
497        let region2 = received_snapshot.executable_regions.remove(&FAKE_REGION_2_ADDRESS).unwrap();
498        assert_eq!(region2.size, FAKE_REGION_2_SIZE);
499        assert_eq!(region2.file_offset, FAKE_REGION_2_FILE_OFFSET);
500        assert_eq!(region2.build_id, FAKE_REGION_2_BUILD_ID);
501        assert!(received_snapshot.executable_regions.is_empty(), "all entries have been removed");
502    }
503
504    #[test_case(|allocation| allocation.address = None => matches
505        Err(Error::MissingField { container: "Allocation", field: "address" }) ; "address")]
506    #[test_case(|allocation| allocation.size = None => matches
507        Err(Error::MissingField { container: "Allocation", field: "size" }) ; "size")]
508    #[test_case(|allocation| allocation.thread_info_key = None => matches
509        Err(Error::MissingField { container: "Allocation", field: "thread_info_key" }) ; "thread_info_key")]
510    #[test_case(|allocation| allocation.stack_trace_key = None => matches
511        Err(Error::MissingField { container: "Allocation", field: "stack_trace_key" }) ; "stack_trace_key")]
512    #[test_case(|allocation| allocation.timestamp = None => matches
513        Err(Error::MissingField { container: "Allocation", field: "timestamp" }) ; "timestamp")]
514    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
515        Ok(_) ; "success")]
516    #[fasync::run_singlethreaded(test)]
517    async fn test_allocation_required_fields(
518        set_one_field_to_none: fn(&mut fheapdump_client::Allocation),
519    ) -> Result<Snapshot, Error> {
520        let (receiver_proxy, receiver_stream) =
521            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
522        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
523
524        // Start with an Allocation with all the required fields set.
525        let mut allocation = fheapdump_client::Allocation {
526            address: Some(FAKE_ALLOCATION_1_ADDRESS),
527            size: Some(FAKE_ALLOCATION_1_SIZE),
528            thread_info_key: Some(FAKE_THREAD_1_KEY),
529            stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
530            timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
531            ..Default::default()
532        };
533
534        // Set one of the fields to None, according to the case being tested.
535        set_one_field_to_none(&mut allocation);
536
537        // Send it to the SnapshotReceiver along with the thread info and stack trace it references.
538        let fut = receiver_proxy.batch(&[
539            fheapdump_client::SnapshotElement::Allocation(allocation),
540            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
541                thread_info_key: Some(FAKE_THREAD_1_KEY),
542                koid: Some(FAKE_THREAD_1_KOID),
543                name: Some(FAKE_THREAD_1_NAME.to_string()),
544                ..Default::default()
545            }),
546            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
547                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
548                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
549                ..Default::default()
550            }),
551        ]);
552        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
553
554        // Send the end of stream marker.
555        let fut = receiver_proxy.batch(&[]);
556        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
557
558        // Return the result.
559        receive_worker.await
560    }
561
562    #[test_case(|thread_info| thread_info.thread_info_key = None => matches
563        Err(Error::MissingField { container: "ThreadInfo", field: "thread_info_key" }) ; "thread_info_key")]
564    #[test_case(|thread_info| thread_info.koid = None => matches
565        Err(Error::MissingField { container: "ThreadInfo", field: "koid" }) ; "koid")]
566    #[test_case(|thread_info| thread_info.name = None => matches
567        Err(Error::MissingField { container: "ThreadInfo", field: "name" }) ; "name")]
568    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
569        Ok(_) ; "success")]
570    #[fasync::run_singlethreaded(test)]
571    async fn test_thread_info_required_fields(
572        set_one_field_to_none: fn(&mut fheapdump_client::ThreadInfo),
573    ) -> Result<Snapshot, Error> {
574        let (receiver_proxy, receiver_stream) =
575            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
576        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
577
578        // Start with a ThreadInfo with all the required fields set.
579        let mut thread_info = fheapdump_client::ThreadInfo {
580            thread_info_key: Some(FAKE_THREAD_1_KEY),
581            koid: Some(FAKE_THREAD_1_KOID),
582            name: Some(FAKE_THREAD_1_NAME.to_string()),
583            ..Default::default()
584        };
585
586        // Set one of the fields to None, according to the case being tested.
587        set_one_field_to_none(&mut thread_info);
588
589        // Send it to the SnapshotReceiver.
590        let fut =
591            receiver_proxy.batch(&[fheapdump_client::SnapshotElement::ThreadInfo(thread_info)]);
592        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
593
594        // Send the end of stream marker.
595        let fut = receiver_proxy.batch(&[]);
596        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
597
598        // Return the result.
599        receive_worker.await
600    }
601
602    #[test_case(|stack_trace| stack_trace.stack_trace_key = None => matches
603        Err(Error::MissingField { container: "StackTrace", field: "stack_trace_key" }) ; "stack_trace_key")]
604    #[test_case(|stack_trace| stack_trace.program_addresses = None => matches
605        Err(Error::MissingField { container: "StackTrace", field: "program_addresses" }) ; "program_addresses")]
606    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
607        Ok(_) ; "success")]
608    #[fasync::run_singlethreaded(test)]
609    async fn test_stack_trace_required_fields(
610        set_one_field_to_none: fn(&mut fheapdump_client::StackTrace),
611    ) -> Result<Snapshot, Error> {
612        let (receiver_proxy, receiver_stream) =
613            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
614        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
615
616        // Start with a StackTrace with all the required fields set.
617        let mut stack_trace = fheapdump_client::StackTrace {
618            stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
619            program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
620            ..Default::default()
621        };
622
623        // Set one of the fields to None, according to the case being tested.
624        set_one_field_to_none(&mut stack_trace);
625
626        // Send it to the SnapshotReceiver.
627        let fut =
628            receiver_proxy.batch(&[fheapdump_client::SnapshotElement::StackTrace(stack_trace)]);
629        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
630
631        // Send the end of stream marker.
632        let fut = receiver_proxy.batch(&[]);
633        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
634
635        // Return the result.
636        receive_worker.await
637    }
638
639    #[test_case(|region| region.address = None => matches
640        Err(Error::MissingField { container: "ExecutableRegion", field: "address" }) ; "address")]
641    #[test_case(|region| region.size = None => matches
642        Err(Error::MissingField { container: "ExecutableRegion", field: "size" }) ; "size")]
643    #[test_case(|region| region.file_offset = None => matches
644        Err(Error::MissingField { container: "ExecutableRegion", field: "file_offset" }) ; "file_offset")]
645    #[test_case(|region| region.build_id = None => matches
646        Err(Error::MissingField { container: "ExecutableRegion", field: "build_id" }) ; "build_id")]
647    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
648        Ok(_) ; "success")]
649    #[fasync::run_singlethreaded(test)]
650    async fn test_executable_region_required_fields(
651        set_one_field_to_none: fn(&mut fheapdump_client::ExecutableRegion),
652    ) -> Result<Snapshot, Error> {
653        let (receiver_proxy, receiver_stream) =
654            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
655        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
656
657        // Start with an ExecutableRegion with all the required fields set.
658        let mut region = fheapdump_client::ExecutableRegion {
659            address: Some(FAKE_REGION_1_ADDRESS),
660            size: Some(FAKE_REGION_1_SIZE),
661            file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
662            build_id: Some(fheapdump_client::BuildId { value: FAKE_REGION_1_BUILD_ID.to_vec() }),
663            ..Default::default()
664        };
665
666        // Set one of the fields to None, according to the case being tested.
667        set_one_field_to_none(&mut region);
668
669        // Send it to the SnapshotReceiver.
670        let fut =
671            receiver_proxy.batch(&[fheapdump_client::SnapshotElement::ExecutableRegion(region)]);
672        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
673
674        // Send the end of stream marker.
675        let fut = receiver_proxy.batch(&[]);
676        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
677
678        // Return the result.
679        receive_worker.await
680    }
681
682    #[test_case(|block_contents| block_contents.address = None => matches
683        Err(Error::MissingField { container: "BlockContents", field: "address" }) ; "address")]
684    #[test_case(|block_contents| block_contents.contents = None => matches
685        Err(Error::MissingField { container: "BlockContents", field: "contents" }) ; "contents")]
686    #[test_case(|_| () /* if we do not set any field to None, the result should be Ok */ => matches
687        Ok(_) ; "success")]
688    #[fasync::run_singlethreaded(test)]
689    async fn test_block_contents_required_fields(
690        set_one_field_to_none: fn(&mut fheapdump_client::BlockContents),
691    ) -> Result<Snapshot, Error> {
692        let (receiver_proxy, receiver_stream) =
693            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
694        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
695
696        // Start with a BlockContents with all the required fields set.
697        let mut block_contents = fheapdump_client::BlockContents {
698            address: Some(FAKE_ALLOCATION_1_ADDRESS),
699            contents: Some(FAKE_ALLOCATION_1_CONTENTS.to_vec()),
700            ..Default::default()
701        };
702
703        // Set one of the fields to None, according to the case being tested.
704        set_one_field_to_none(&mut block_contents);
705
706        // Send it to the SnapshotReceiver along with the allocation it references.
707        let fut = receiver_proxy.batch(&[
708            fheapdump_client::SnapshotElement::BlockContents(block_contents),
709            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
710                address: Some(FAKE_ALLOCATION_1_ADDRESS),
711                size: Some(FAKE_ALLOCATION_1_SIZE),
712                thread_info_key: Some(FAKE_THREAD_1_KEY),
713                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
714                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
715                ..Default::default()
716            }),
717            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
718                thread_info_key: Some(FAKE_THREAD_1_KEY),
719                koid: Some(FAKE_THREAD_1_KOID),
720                name: Some(FAKE_THREAD_1_NAME.to_string()),
721                ..Default::default()
722            }),
723            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
724                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
725                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
726                ..Default::default()
727            }),
728        ]);
729        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
730
731        // Send the end of stream marker.
732        let fut = receiver_proxy.batch(&[]);
733        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
734
735        // Return the result.
736        receive_worker.await
737    }
738
739    #[fasync::run_singlethreaded(test)]
740    async fn test_conflicting_allocations() {
741        let (receiver_proxy, receiver_stream) =
742            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
743        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
744
745        // Send two allocations with the same address along with the stack trace they reference.
746        let fut = receiver_proxy.batch(&[
747            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
748                address: Some(FAKE_ALLOCATION_1_ADDRESS),
749                size: Some(FAKE_ALLOCATION_1_SIZE),
750                thread_info_key: Some(FAKE_THREAD_1_KEY),
751                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
752                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
753                ..Default::default()
754            }),
755            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
756                address: Some(FAKE_ALLOCATION_1_ADDRESS),
757                size: Some(FAKE_ALLOCATION_1_SIZE),
758                thread_info_key: Some(FAKE_THREAD_1_KEY),
759                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
760                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
761                ..Default::default()
762            }),
763            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
764                thread_info_key: Some(FAKE_THREAD_1_KEY),
765                koid: Some(FAKE_THREAD_1_KOID),
766                name: Some(FAKE_THREAD_1_NAME.to_string()),
767                ..Default::default()
768            }),
769            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
770                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
771                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
772                ..Default::default()
773            }),
774        ]);
775        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
776
777        // Send the end of stream marker.
778        let fut = receiver_proxy.batch(&[]);
779        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
780
781        // Verify expected error.
782        assert_matches!(
783            receive_worker.await,
784            Err(Error::ConflictingElement { element_type: "Allocation" })
785        );
786    }
787
788    #[fasync::run_singlethreaded(test)]
789    async fn test_conflicting_executable_regions() {
790        let (receiver_proxy, receiver_stream) =
791            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
792        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
793
794        // Send two executable regions with the same address.
795        let fut = receiver_proxy.batch(&[
796            fheapdump_client::SnapshotElement::ExecutableRegion(
797                fheapdump_client::ExecutableRegion {
798                    address: Some(FAKE_REGION_1_ADDRESS),
799                    size: Some(FAKE_REGION_1_SIZE),
800                    file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
801                    build_id: Some(fheapdump_client::BuildId {
802                        value: FAKE_REGION_1_BUILD_ID.to_vec(),
803                    }),
804                    ..Default::default()
805                },
806            ),
807            fheapdump_client::SnapshotElement::ExecutableRegion(
808                fheapdump_client::ExecutableRegion {
809                    address: Some(FAKE_REGION_1_ADDRESS),
810                    size: Some(FAKE_REGION_1_SIZE),
811                    file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
812                    build_id: Some(fheapdump_client::BuildId {
813                        value: FAKE_REGION_1_BUILD_ID.to_vec(),
814                    }),
815                    ..Default::default()
816                },
817            ),
818        ]);
819        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
820
821        // Send the end of stream marker.
822        let fut = receiver_proxy.batch(&[]);
823        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
824
825        // Verify expected error.
826        assert_matches!(
827            receive_worker.await,
828            Err(Error::ConflictingElement { element_type: "ExecutableRegion" })
829        );
830    }
831
832    #[fasync::run_singlethreaded(test)]
833    async fn test_block_contents_wrong_size() {
834        let (receiver_proxy, receiver_stream) =
835            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
836        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
837
838        // Send an allocation whose BlockContents has the wrong size.
839        let contents_with_wrong_size = vec![0; FAKE_ALLOCATION_1_SIZE as usize + 1];
840        let fut = receiver_proxy.batch(&[
841            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
842                address: Some(FAKE_ALLOCATION_1_ADDRESS),
843                size: Some(FAKE_ALLOCATION_1_SIZE),
844                thread_info_key: Some(FAKE_THREAD_1_KEY),
845                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
846                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
847                ..Default::default()
848            }),
849            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
850                address: Some(FAKE_ALLOCATION_1_ADDRESS),
851                contents: Some(contents_with_wrong_size),
852                ..Default::default()
853            }),
854            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
855                thread_info_key: Some(FAKE_THREAD_1_KEY),
856                koid: Some(FAKE_THREAD_1_KOID),
857                name: Some(FAKE_THREAD_1_NAME.to_string()),
858                ..Default::default()
859            }),
860            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
861                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
862                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
863                ..Default::default()
864            }),
865        ]);
866        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
867
868        // Send the end of stream marker.
869        let fut = receiver_proxy.batch(&[]);
870        let _ = fut.await; // ignore result, as the peer may detect the error and close the channel
871
872        // Verify expected error.
873        assert_matches!(
874            receive_worker.await,
875            Err(Error::ConflictingElement { element_type: "BlockContents" })
876        );
877    }
878
879    #[fasync::run_singlethreaded(test)]
880    async fn test_empty_stack_trace() {
881        let (receiver_proxy, receiver_stream) =
882            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
883        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
884
885        // Send an allocation that references an empty stack trace.
886        let fut = receiver_proxy.batch(&[
887            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
888                address: Some(FAKE_ALLOCATION_1_ADDRESS),
889                size: Some(FAKE_ALLOCATION_1_SIZE),
890                thread_info_key: Some(FAKE_THREAD_1_KEY),
891                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
892                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
893                ..Default::default()
894            }),
895            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
896                thread_info_key: Some(FAKE_THREAD_1_KEY),
897                koid: Some(FAKE_THREAD_1_KOID),
898                name: Some(FAKE_THREAD_1_NAME.to_string()),
899                ..Default::default()
900            }),
901            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
902                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
903                program_addresses: Some(vec![]),
904                ..Default::default()
905            }),
906        ]);
907        fut.await.unwrap();
908
909        // Send the end of stream marker.
910        let fut = receiver_proxy.batch(&[]);
911        fut.await.unwrap();
912
913        // Verify that the stack trace has been reconstructed correctly.
914        let mut received_snapshot = receive_worker.await.unwrap();
915        let allocation1 = received_snapshot.allocations.remove(&FAKE_ALLOCATION_1_ADDRESS).unwrap();
916        assert_eq!(allocation1.stack_trace.program_addresses, []);
917    }
918
919    #[fasync::run_singlethreaded(test)]
920    async fn test_chunked_stack_trace() {
921        let (receiver_proxy, receiver_stream) =
922            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
923        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
924
925        // Send an allocation and the first chunk of its stack trace.
926        let fut = receiver_proxy.batch(&[
927            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
928                address: Some(FAKE_ALLOCATION_1_ADDRESS),
929                size: Some(FAKE_ALLOCATION_1_SIZE),
930                thread_info_key: Some(FAKE_THREAD_1_KEY),
931                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
932                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
933                ..Default::default()
934            }),
935            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
936                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
937                program_addresses: Some(vec![1111, 2222]),
938                ..Default::default()
939            }),
940            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
941                thread_info_key: Some(FAKE_THREAD_1_KEY),
942                koid: Some(FAKE_THREAD_1_KOID),
943                name: Some(FAKE_THREAD_1_NAME.to_string()),
944                ..Default::default()
945            }),
946        ]);
947        fut.await.unwrap();
948
949        // Send the second chunk.
950        let fut = receiver_proxy.batch(&[fheapdump_client::SnapshotElement::StackTrace(
951            fheapdump_client::StackTrace {
952                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
953                program_addresses: Some(vec![3333]),
954                ..Default::default()
955            },
956        )]);
957        fut.await.unwrap();
958
959        // Send the end of stream marker.
960        let fut = receiver_proxy.batch(&[]);
961        fut.await.unwrap();
962
963        // Verify that the stack trace has been reconstructed correctly.
964        let mut received_snapshot = receive_worker.await.unwrap();
965        let allocation1 = received_snapshot.allocations.remove(&FAKE_ALLOCATION_1_ADDRESS).unwrap();
966        assert_eq!(allocation1.stack_trace.program_addresses, [1111, 2222, 3333]);
967    }
968
969    #[fasync::run_singlethreaded(test)]
970    async fn test_empty_block_contents() {
971        let (receiver_proxy, receiver_stream) =
972            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
973        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
974
975        // Send a zero-sized allocation and its empty contents.
976        let fut = receiver_proxy.batch(&[
977            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
978                address: Some(FAKE_ALLOCATION_1_ADDRESS),
979                size: Some(0),
980                thread_info_key: Some(FAKE_THREAD_1_KEY),
981                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
982                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
983                ..Default::default()
984            }),
985            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
986                thread_info_key: Some(FAKE_THREAD_1_KEY),
987                koid: Some(FAKE_THREAD_1_KOID),
988                name: Some(FAKE_THREAD_1_NAME.to_string()),
989                ..Default::default()
990            }),
991            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
992                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
993                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
994                ..Default::default()
995            }),
996            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
997                address: Some(FAKE_ALLOCATION_1_ADDRESS),
998                contents: Some(vec![]),
999                ..Default::default()
1000            }),
1001        ]);
1002        fut.await.unwrap();
1003
1004        // Send the end of stream marker.
1005        let fut = receiver_proxy.batch(&[]);
1006        fut.await.unwrap();
1007
1008        // Verify that the allocation has been reconstructed correctly.
1009        let mut received_snapshot = receive_worker.await.unwrap();
1010        let allocation1 = received_snapshot.allocations.remove(&FAKE_ALLOCATION_1_ADDRESS).unwrap();
1011        assert_eq!(allocation1.contents.expect("contents must be set"), []);
1012    }
1013
1014    #[fasync::run_singlethreaded(test)]
1015    async fn test_chunked_block_contents() {
1016        let (receiver_proxy, receiver_stream) =
1017            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1018        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
1019
1020        // Split the contents in two halves.
1021        let (content_first_chunk, contents_second_chunk) =
1022            FAKE_ALLOCATION_1_CONTENTS.split_at(FAKE_ALLOCATION_1_CONTENTS.len() / 2);
1023
1024        // Send an allocation and the first chunk of its contents.
1025        let fut = receiver_proxy.batch(&[
1026            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1027                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1028                size: Some(FAKE_ALLOCATION_1_SIZE),
1029                thread_info_key: Some(FAKE_THREAD_1_KEY),
1030                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1031                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1032                ..Default::default()
1033            }),
1034            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1035                thread_info_key: Some(FAKE_THREAD_1_KEY),
1036                koid: Some(FAKE_THREAD_1_KOID),
1037                name: Some(FAKE_THREAD_1_NAME.to_string()),
1038                ..Default::default()
1039            }),
1040            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1041                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1042                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1043                ..Default::default()
1044            }),
1045            fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
1046                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1047                contents: Some(content_first_chunk.to_vec()),
1048                ..Default::default()
1049            }),
1050        ]);
1051        fut.await.unwrap();
1052
1053        // Send the second chunk.
1054        let fut = receiver_proxy.batch(&[fheapdump_client::SnapshotElement::BlockContents(
1055            fheapdump_client::BlockContents {
1056                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1057                contents: Some(contents_second_chunk.to_vec()),
1058                ..Default::default()
1059            },
1060        )]);
1061        fut.await.unwrap();
1062
1063        // Send the end of stream marker.
1064        let fut = receiver_proxy.batch(&[]);
1065        fut.await.unwrap();
1066
1067        // Verify that the allocation's block contents have been reconstructed correctly.
1068        let mut received_snapshot = receive_worker.await.unwrap();
1069        let allocation1 = received_snapshot.allocations.remove(&FAKE_ALLOCATION_1_ADDRESS).unwrap();
1070        assert_eq!(allocation1.contents.expect("contents must be set"), FAKE_ALLOCATION_1_CONTENTS);
1071    }
1072
1073    #[fasync::run_singlethreaded(test)]
1074    async fn test_missing_end_of_stream() {
1075        let (receiver_proxy, receiver_stream) =
1076            create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1077        let receive_worker = fasync::Task::local(Snapshot::receive_from(receiver_stream));
1078
1079        // Send an allocation and its stack trace.
1080        let fut = receiver_proxy.batch(&[
1081            fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1082                address: Some(FAKE_ALLOCATION_1_ADDRESS),
1083                size: Some(FAKE_ALLOCATION_1_SIZE),
1084                thread_info_key: Some(FAKE_THREAD_1_KEY),
1085                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1086                timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1087                ..Default::default()
1088            }),
1089            fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1090                thread_info_key: Some(FAKE_THREAD_1_KEY),
1091                koid: Some(FAKE_THREAD_1_KOID),
1092                name: Some(FAKE_THREAD_1_NAME.to_string()),
1093                ..Default::default()
1094            }),
1095            fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1096                stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1097                program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1098                ..Default::default()
1099            }),
1100        ]);
1101        fut.await.unwrap();
1102
1103        // Close the channel without sending an end of stream marker.
1104        std::mem::drop(receiver_proxy);
1105
1106        // Expect an UnexpectedEndOfStream error.
1107        assert_matches!(receive_worker.await, Err(Error::UnexpectedEndOfStream));
1108    }
1109}