1use fidl::MonotonicInstant;
6use flex_fuchsia_memory_heapdump_client as fheapdump_client;
7use futures::StreamExt;
8use std::collections::{HashMap, HashSet};
9use std::rc::Rc;
10
11use crate::Error;
12
13#[derive(Debug)]
15pub struct SnapshotWithHeader {
16 pub process_name: String,
17 pub process_koid: u64,
18 pub snapshot: Snapshot,
19}
20
21#[derive(Debug)]
23pub struct Snapshot {
24 pub allocations: Vec<Allocation>,
26
27 pub executable_regions: HashMap<u64, ExecutableRegion>,
29}
30
31#[derive(Debug)]
33pub struct Allocation {
34 pub address: Option<u64>,
35
36 pub count: u64,
38
39 pub size: u64,
41
42 pub thread_info: Option<Rc<ThreadInfo>>,
44
45 pub stack_trace: Rc<StackTrace>,
47
48 pub timestamp: Option<MonotonicInstant>,
50
51 pub contents: Option<Vec<u8>>,
53}
54
55#[derive(Debug)]
57pub struct StackTrace {
58 pub program_addresses: Vec<u64>,
60}
61
62#[derive(Debug)]
64pub struct ExecutableRegion {
65 pub name: String,
67
68 pub size: u64,
70
71 pub file_offset: u64,
73
74 pub vaddr: Option<u64>,
79
80 pub build_id: Vec<u8>,
82}
83
84#[derive(Debug, PartialEq)]
86pub struct ThreadInfo {
87 pub koid: zx_types::zx_koid_t,
89
90 pub name: String,
92}
93
94macro_rules! read_field {
109 ($e:expr => $c:ident, $f:ident) => {
110 $e.$f.ok_or(Error::MissingField {
111 container: std::stringify!($c),
112 field: std::stringify!($f),
113 })
114 };
115}
116
117impl Snapshot {
118 pub async fn receive_single_from(
120 mut stream: fheapdump_client::SnapshotReceiverRequestStream,
121 ) -> Result<Snapshot, Error> {
122 Snapshot::receive_inner(&mut stream).await
123 }
124
125 #[cfg(fuchsia_api_level_at_least = "HEAD")]
128 pub async fn receive_multi_from(
129 mut stream: fheapdump_client::SnapshotReceiverRequestStream,
130 ) -> Result<Vec<SnapshotWithHeader>, Error> {
131 let mut snapshots = vec![];
132 loop {
133 match stream.next().await.transpose()? {
136 Some(fheapdump_client::SnapshotReceiverRequest::Batch { batch, responder }) => {
137 match &batch[..] {
138 [fheapdump_client::SnapshotElement::SnapshotHeader(header)] => {
139 responder.send()?;
140
141 let snapshot = Snapshot::receive_inner(&mut stream).await?;
143
144 let header = header.clone();
145 snapshots.push(SnapshotWithHeader {
146 process_name: read_field!(header => SnapshotHeader, process_name)?,
147 process_koid: read_field!(header => SnapshotHeader, process_koid)?,
148 snapshot,
149 });
150 }
151 [] => {
152 responder.send()?;
153 return Ok(snapshots);
154 }
155 _ => return Err(Error::HeaderExpected),
156 }
157 }
158 Some(fheapdump_client::SnapshotReceiverRequest::ReportError {
159 error,
160 responder,
161 }) => {
162 let _ = responder.send(); return Err(Error::CollectorError(error));
164 }
165 None => return Err(Error::UnexpectedEndOfStream),
166 };
167 }
168 }
169
170 async fn receive_inner(
171 stream: &mut fheapdump_client::SnapshotReceiverRequestStream,
172 ) -> Result<Snapshot, Error> {
173 struct AllocationValue {
174 address: Option<u64>,
175 count: u64,
176 size: u64,
177 thread_info_key: Option<u64>,
178 stack_trace_key: u64,
179 timestamp: Option<MonotonicInstant>,
180 }
181 let mut allocation_addresses: HashSet<u64> = HashSet::new();
182 let mut allocations: Vec<AllocationValue> = vec![];
183 let mut thread_infos: HashMap<u64, Rc<ThreadInfo>> = HashMap::new();
184 let mut stack_traces: HashMap<u64, Vec<u64>> = HashMap::new();
185 let mut executable_regions: HashMap<u64, ExecutableRegion> = HashMap::new();
186 let mut contents: HashMap<u64, Vec<u8>> = HashMap::new();
187
188 loop {
189 let batch = match stream.next().await.transpose()? {
191 Some(fheapdump_client::SnapshotReceiverRequest::Batch { batch, responder }) => {
192 responder.send()?;
194 batch
195 }
196 Some(fheapdump_client::SnapshotReceiverRequest::ReportError {
197 error,
198 responder,
199 }) => {
200 let _ = responder.send(); return Err(Error::CollectorError(error));
202 }
203 None => return Err(Error::UnexpectedEndOfStream),
204 };
205
206 if !batch.is_empty() {
208 for element in batch {
209 match element {
210 fheapdump_client::SnapshotElement::Allocation(allocation) => {
211 if let Some(address) = allocation.address {
212 if !allocation_addresses.insert(address) {
213 return Err(Error::ConflictingElement {
214 element_type: "Allocation",
215 });
216 }
217 }
218
219 #[cfg(not(fuchsia_api_level_at_least = "29"))]
220 let count = 1;
221 #[cfg(fuchsia_api_level_at_least = "29")]
222 let count = allocation.count.unwrap_or(1);
223
224 let size = read_field!(allocation => Allocation, size)?;
225 let stack_trace_key =
226 read_field!(allocation => Allocation, stack_trace_key)?;
227 allocations.push(AllocationValue {
228 address: allocation.address,
229 count,
230 size,
231 thread_info_key: allocation.thread_info_key,
232 stack_trace_key,
233 timestamp: allocation.timestamp,
234 });
235 }
236 fheapdump_client::SnapshotElement::StackTrace(stack_trace) => {
237 let stack_trace_key =
238 read_field!(stack_trace => StackTrace, stack_trace_key)?;
239 let mut program_addresses =
240 read_field!(stack_trace => StackTrace, program_addresses)?;
241 stack_traces
242 .entry(stack_trace_key)
243 .or_default()
244 .append(&mut program_addresses);
245 }
246 fheapdump_client::SnapshotElement::ThreadInfo(thread_info) => {
247 let thread_info_key =
248 read_field!(thread_info => ThreadInfo, thread_info_key)?;
249 let koid = read_field!(thread_info => ThreadInfo, koid)?;
250 let name = read_field!(thread_info => ThreadInfo, name)?;
251 if thread_infos
252 .insert(thread_info_key, Rc::new(ThreadInfo { koid, name }))
253 .is_some()
254 {
255 return Err(Error::ConflictingElement {
256 element_type: "ThreadInfo",
257 });
258 }
259 }
260 fheapdump_client::SnapshotElement::ExecutableRegion(region) => {
261 let address = read_field!(region => ExecutableRegion, address)?;
262 #[cfg(fuchsia_api_level_at_least = "27")]
263 let name = region.name.unwrap_or_else(|| String::new());
264 #[cfg(not(fuchsia_api_level_at_least = "27"))]
265 let name = String::new();
266 let size = read_field!(region => ExecutableRegion, size)?;
267 let file_offset = read_field!(region => ExecutableRegion, file_offset)?;
268 #[cfg(fuchsia_api_level_at_least = "27")]
269 let vaddr = region.vaddr;
270 #[cfg(not(fuchsia_api_level_at_least = "27"))]
271 let vaddr = None;
272 let build_id = read_field!(region => ExecutableRegion, build_id)?.value;
273 let region =
274 ExecutableRegion { name, size, file_offset, vaddr, build_id };
275 if executable_regions.insert(address, region).is_some() {
276 return Err(Error::ConflictingElement {
277 element_type: "ExecutableRegion",
278 });
279 }
280 }
281 fheapdump_client::SnapshotElement::BlockContents(block_contents) => {
282 let address = read_field!(block_contents => BlockContents, address)?;
283 let mut chunk = read_field!(block_contents => BlockContents, contents)?;
284 contents.entry(address).or_default().append(&mut chunk);
285 }
286 _ => return Err(Error::UnexpectedElementType),
287 }
288 }
289 } else {
290 let final_stack_traces: HashMap<u64, Rc<StackTrace>> = stack_traces
293 .into_iter()
294 .map(|(key, program_addresses)| {
295 (key, Rc::new(StackTrace { program_addresses }))
296 })
297 .collect();
298 let mut final_allocations = vec![];
299 for AllocationValue {
300 address,
301 count,
302 size,
303 thread_info_key,
304 stack_trace_key,
305 timestamp,
306 } in allocations
307 {
308 let thread_info = match thread_info_key {
309 Some(key) => Some(
310 thread_infos
311 .get(&key)
312 .ok_or(Error::InvalidCrossReference { element_type: "ThreadInfo" })?
313 .clone(),
314 ),
315 None => None,
316 };
317 let stack_trace = final_stack_traces
318 .get(&stack_trace_key)
319 .ok_or(Error::InvalidCrossReference { element_type: "StackTrace" })?
320 .clone();
321 let contents = address.and_then(|address| contents.remove(&address));
322 if let Some(data) = &contents {
323 if data.len() as u64 != size {
324 return Err(Error::ConflictingElement {
325 element_type: "BlockContents",
326 });
327 }
328 }
329 final_allocations.push(Allocation {
330 address,
331 count,
332 size,
333 thread_info,
334 stack_trace,
335 timestamp,
336 contents,
337 });
338 }
339
340 return Ok(Snapshot { allocations: final_allocations, executable_regions });
341 }
342 }
343 }
344}
345
346#[cfg(test)]
347mod tests {
348 use super::*;
349 use crate::test_helpers::create_client;
350 use assert_matches::assert_matches;
351 use fuchsia_async as fasync;
352 use test_case::test_case;
353
354 const FAKE_ALLOCATION_1_ADDRESS: u64 = 1234;
356 const FAKE_ALLOCATION_1_SIZE: u64 = 8;
357 const FAKE_ALLOCATION_1_TIMESTAMP: MonotonicInstant = MonotonicInstant::from_nanos(888888888);
358 const FAKE_ALLOCATION_1_CONTENTS: [u8; FAKE_ALLOCATION_1_SIZE as usize] = *b"12345678";
359 const FAKE_ALLOCATION_2_ADDRESS: u64 = 5678;
360 const FAKE_ALLOCATION_2_SIZE: u64 = 4;
361 const FAKE_ALLOCATION_2_TIMESTAMP: MonotonicInstant = MonotonicInstant::from_nanos(-777777777); const FAKE_THREAD_1_KOID: u64 = 1212;
363 const FAKE_THREAD_1_NAME: &str = "fake-thread-1-name";
364 const FAKE_THREAD_1_KEY: u64 = 4567;
365 const FAKE_THREAD_2_KOID: u64 = 1213;
366 const FAKE_THREAD_2_NAME: &str = "fake-thread-2-name";
367 const FAKE_THREAD_2_KEY: u64 = 7654;
368 const FAKE_STACK_TRACE_1_ADDRESSES: [u64; 6] = [11111, 22222, 33333, 22222, 44444, 55555];
369 const FAKE_STACK_TRACE_1_KEY: u64 = 9876;
370 const FAKE_STACK_TRACE_2_ADDRESSES: [u64; 4] = [11111, 22222, 11111, 66666];
371 const FAKE_STACK_TRACE_2_KEY: u64 = 6789;
372 const FAKE_REGION_1_ADDRESS: u64 = 0x10000000;
373 const FAKE_REGION_1_NAME: &str = "region-1";
374 const FAKE_REGION_1_SIZE: u64 = 0x80000;
375 const FAKE_REGION_1_FILE_OFFSET: u64 = 0x1000;
376 const FAKE_REGION_1_VADDR: u64 = 0x3000;
377 const FAKE_REGION_1_BUILD_ID: &[u8] = &[0xaa; 20];
378 const FAKE_REGION_2_ADDRESS: u64 = 0x7654300000;
379 const FAKE_REGION_2_SIZE: u64 = 0x200000;
380 const FAKE_REGION_2_FILE_OFFSET: u64 = 0x2000;
381 const FAKE_REGION_2_BUILD_ID: &[u8] = &[0x55; 32];
382
383 #[fasync::run_singlethreaded(test)]
384 async fn test_empty() {
385 let client = create_client();
386 let (receiver_proxy, receiver_stream) =
387 client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
388 let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
389
390 let fut = receiver_proxy.batch(&[]);
392 fut.await.unwrap();
393
394 let received_snapshot = receive_worker.await.unwrap();
396 assert!(received_snapshot.allocations.is_empty());
397 assert!(received_snapshot.executable_regions.is_empty());
398 }
399
400 #[fasync::run_singlethreaded(test)]
401 async fn test_one_batch() {
402 let client = create_client();
403 let (receiver_proxy, receiver_stream) =
404 client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
405 let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
406
407 let fut = receiver_proxy.batch(&[
411 fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
412 address: Some(FAKE_ALLOCATION_1_ADDRESS),
413 contents: Some(FAKE_ALLOCATION_1_CONTENTS.to_vec()),
414 ..Default::default()
415 }),
416 fheapdump_client::SnapshotElement::ExecutableRegion(
417 fheapdump_client::ExecutableRegion {
418 address: Some(FAKE_REGION_1_ADDRESS),
419 name: Some(FAKE_REGION_1_NAME.to_string()),
420 size: Some(FAKE_REGION_1_SIZE),
421 file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
422 vaddr: Some(FAKE_REGION_1_VADDR),
423 build_id: Some(fheapdump_client::BuildId {
424 value: FAKE_REGION_1_BUILD_ID.to_vec(),
425 }),
426 ..Default::default()
427 },
428 ),
429 fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
430 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
431 program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
432 ..Default::default()
433 }),
434 fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
435 address: Some(FAKE_ALLOCATION_1_ADDRESS),
436 size: Some(FAKE_ALLOCATION_1_SIZE),
437 thread_info_key: Some(FAKE_THREAD_1_KEY),
438 stack_trace_key: Some(FAKE_STACK_TRACE_2_KEY),
439 timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
440 ..Default::default()
441 }),
442 fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
443 thread_info_key: Some(FAKE_THREAD_1_KEY),
444 koid: Some(FAKE_THREAD_1_KOID),
445 name: Some(FAKE_THREAD_1_NAME.to_string()),
446 ..Default::default()
447 }),
448 fheapdump_client::SnapshotElement::ExecutableRegion(
449 fheapdump_client::ExecutableRegion {
450 address: Some(FAKE_REGION_2_ADDRESS),
451 size: Some(FAKE_REGION_2_SIZE),
452 file_offset: Some(FAKE_REGION_2_FILE_OFFSET),
453 build_id: Some(fheapdump_client::BuildId {
454 value: FAKE_REGION_2_BUILD_ID.to_vec(),
455 }),
456 ..Default::default()
457 },
458 ),
459 fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
460 thread_info_key: Some(FAKE_THREAD_2_KEY),
461 koid: Some(FAKE_THREAD_2_KOID),
462 name: Some(FAKE_THREAD_2_NAME.to_string()),
463 ..Default::default()
464 }),
465 fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
466 address: Some(FAKE_ALLOCATION_2_ADDRESS),
467 size: Some(FAKE_ALLOCATION_2_SIZE),
468 thread_info_key: Some(FAKE_THREAD_2_KEY),
469 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
470 timestamp: Some(FAKE_ALLOCATION_2_TIMESTAMP),
471 ..Default::default()
472 }),
473 fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
474 stack_trace_key: Some(FAKE_STACK_TRACE_2_KEY),
475 program_addresses: Some(FAKE_STACK_TRACE_2_ADDRESSES.to_vec()),
476 ..Default::default()
477 }),
478 ]);
479 fut.await.unwrap();
480
481 let fut = receiver_proxy.batch(&[]);
483 fut.await.unwrap();
484
485 let mut received_snapshot = receive_worker.await.unwrap();
487 let allocation1 = received_snapshot.allocations.swap_remove(
488 received_snapshot
489 .allocations
490 .iter()
491 .position(|alloc| alloc.address == Some(FAKE_ALLOCATION_1_ADDRESS))
492 .unwrap(),
493 );
494 assert_eq!(allocation1.size, FAKE_ALLOCATION_1_SIZE);
495 assert_eq!(
496 allocation1.thread_info,
497 Some(Rc::new(ThreadInfo {
498 koid: FAKE_THREAD_1_KOID,
499 name: FAKE_THREAD_1_NAME.to_owned()
500 }))
501 );
502 assert_eq!(allocation1.stack_trace.program_addresses, FAKE_STACK_TRACE_2_ADDRESSES);
503 assert_eq!(allocation1.timestamp, Some(FAKE_ALLOCATION_1_TIMESTAMP));
504 assert_eq!(
505 allocation1.contents.as_ref().expect("contents must be set"),
506 &FAKE_ALLOCATION_1_CONTENTS.to_vec()
507 );
508 let allocation2 = received_snapshot.allocations.swap_remove(
509 received_snapshot
510 .allocations
511 .iter()
512 .position(|alloc| alloc.address == Some(FAKE_ALLOCATION_2_ADDRESS))
513 .unwrap(),
514 );
515 assert_eq!(allocation2.size, FAKE_ALLOCATION_2_SIZE);
516 assert_eq!(
517 allocation2.thread_info,
518 Some(Rc::new(ThreadInfo {
519 koid: FAKE_THREAD_2_KOID,
520 name: FAKE_THREAD_2_NAME.to_owned()
521 }))
522 );
523 assert_eq!(allocation2.stack_trace.program_addresses, FAKE_STACK_TRACE_1_ADDRESSES);
524 assert_eq!(allocation2.timestamp, Some(FAKE_ALLOCATION_2_TIMESTAMP));
525 assert_matches!(allocation2.contents, None, "no contents are sent for this allocation");
526 assert!(received_snapshot.allocations.is_empty(), "all the entries have been removed");
527 let region1 = received_snapshot.executable_regions.remove(&FAKE_REGION_1_ADDRESS).unwrap();
528 assert_eq!(region1.name, FAKE_REGION_1_NAME);
529 assert_eq!(region1.size, FAKE_REGION_1_SIZE);
530 assert_eq!(region1.file_offset, FAKE_REGION_1_FILE_OFFSET);
531 assert_eq!(region1.vaddr.unwrap(), FAKE_REGION_1_VADDR);
532 assert_eq!(region1.build_id, FAKE_REGION_1_BUILD_ID);
533 let region2 = received_snapshot.executable_regions.remove(&FAKE_REGION_2_ADDRESS).unwrap();
534 assert_eq!(region2.size, FAKE_REGION_2_SIZE);
535 assert_eq!(region2.file_offset, FAKE_REGION_2_FILE_OFFSET);
536 assert_eq!(region2.build_id, FAKE_REGION_2_BUILD_ID);
537 assert!(received_snapshot.executable_regions.is_empty(), "all entries have been removed");
538 }
539
540 #[fasync::run_singlethreaded(test)]
541 async fn test_two_batches() {
542 let client = create_client();
543 let (receiver_proxy, receiver_stream) =
544 client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
545 let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
546
547 let fut = receiver_proxy.batch(&[
549 fheapdump_client::SnapshotElement::ExecutableRegion(
550 fheapdump_client::ExecutableRegion {
551 address: Some(FAKE_REGION_2_ADDRESS),
552 size: Some(FAKE_REGION_2_SIZE),
553 file_offset: Some(FAKE_REGION_2_FILE_OFFSET),
554 build_id: Some(fheapdump_client::BuildId {
555 value: FAKE_REGION_2_BUILD_ID.to_vec(),
556 }),
557 ..Default::default()
558 },
559 ),
560 fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
561 address: Some(FAKE_ALLOCATION_1_ADDRESS),
562 size: Some(FAKE_ALLOCATION_1_SIZE),
563 thread_info_key: Some(FAKE_THREAD_1_KEY),
564 stack_trace_key: Some(FAKE_STACK_TRACE_2_KEY),
565 timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
566 ..Default::default()
567 }),
568 fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
569 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
570 program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
571 ..Default::default()
572 }),
573 fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
574 thread_info_key: Some(FAKE_THREAD_2_KEY),
575 koid: Some(FAKE_THREAD_2_KOID),
576 name: Some(FAKE_THREAD_2_NAME.to_string()),
577 ..Default::default()
578 }),
579 ]);
580 fut.await.unwrap();
581
582 let fut = receiver_proxy.batch(&[
584 fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
585 thread_info_key: Some(FAKE_THREAD_1_KEY),
586 koid: Some(FAKE_THREAD_1_KOID),
587 name: Some(FAKE_THREAD_1_NAME.to_string()),
588 ..Default::default()
589 }),
590 fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
591 address: Some(FAKE_ALLOCATION_2_ADDRESS),
592 size: Some(FAKE_ALLOCATION_2_SIZE),
593 thread_info_key: Some(FAKE_THREAD_2_KEY),
594 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
595 timestamp: Some(FAKE_ALLOCATION_2_TIMESTAMP),
596 ..Default::default()
597 }),
598 fheapdump_client::SnapshotElement::ExecutableRegion(
599 fheapdump_client::ExecutableRegion {
600 address: Some(FAKE_REGION_1_ADDRESS),
601 name: Some(FAKE_REGION_1_NAME.to_string()),
602 size: Some(FAKE_REGION_1_SIZE),
603 file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
604 vaddr: Some(FAKE_REGION_1_VADDR),
605 build_id: Some(fheapdump_client::BuildId {
606 value: FAKE_REGION_1_BUILD_ID.to_vec(),
607 }),
608 ..Default::default()
609 },
610 ),
611 fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
612 stack_trace_key: Some(FAKE_STACK_TRACE_2_KEY),
613 program_addresses: Some(FAKE_STACK_TRACE_2_ADDRESSES.to_vec()),
614 ..Default::default()
615 }),
616 fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
617 address: Some(FAKE_ALLOCATION_1_ADDRESS),
618 contents: Some(FAKE_ALLOCATION_1_CONTENTS.to_vec()),
619 ..Default::default()
620 }),
621 ]);
622 fut.await.unwrap();
623
624 let fut = receiver_proxy.batch(&[]);
626 fut.await.unwrap();
627
628 let mut received_snapshot = receive_worker.await.unwrap();
630 let allocation1 = received_snapshot.allocations.swap_remove(
631 received_snapshot
632 .allocations
633 .iter()
634 .position(|alloc| alloc.address == Some(FAKE_ALLOCATION_1_ADDRESS))
635 .unwrap(),
636 );
637 assert_eq!(allocation1.size, FAKE_ALLOCATION_1_SIZE);
638 assert_eq!(
639 allocation1.thread_info,
640 Some(Rc::new(ThreadInfo {
641 koid: FAKE_THREAD_1_KOID,
642 name: FAKE_THREAD_1_NAME.to_owned()
643 }))
644 );
645 assert_eq!(allocation1.stack_trace.program_addresses, FAKE_STACK_TRACE_2_ADDRESSES);
646 assert_eq!(allocation1.timestamp, Some(FAKE_ALLOCATION_1_TIMESTAMP));
647 assert_eq!(
648 allocation1.contents.as_ref().expect("contents must be set"),
649 &FAKE_ALLOCATION_1_CONTENTS.to_vec()
650 );
651 let allocation2 = received_snapshot.allocations.swap_remove(
652 received_snapshot
653 .allocations
654 .iter()
655 .position(|alloc| alloc.address == Some(FAKE_ALLOCATION_2_ADDRESS))
656 .unwrap(),
657 );
658 assert_eq!(allocation2.size, FAKE_ALLOCATION_2_SIZE);
659 assert_eq!(
660 allocation2.thread_info,
661 Some(Rc::new(ThreadInfo {
662 koid: FAKE_THREAD_2_KOID,
663 name: FAKE_THREAD_2_NAME.to_owned()
664 }))
665 );
666 assert_eq!(allocation2.stack_trace.program_addresses, FAKE_STACK_TRACE_1_ADDRESSES);
667 assert_eq!(allocation2.timestamp, Some(FAKE_ALLOCATION_2_TIMESTAMP));
668 assert_matches!(allocation2.contents, None, "no contents are sent for this allocation");
669 assert!(received_snapshot.allocations.is_empty(), "all the entries have been removed");
670 let region1 = received_snapshot.executable_regions.remove(&FAKE_REGION_1_ADDRESS).unwrap();
671 assert_eq!(region1.name, FAKE_REGION_1_NAME);
672 assert_eq!(region1.size, FAKE_REGION_1_SIZE);
673 assert_eq!(region1.file_offset, FAKE_REGION_1_FILE_OFFSET);
674 assert_eq!(region1.vaddr.unwrap(), FAKE_REGION_1_VADDR);
675 assert_eq!(region1.build_id, FAKE_REGION_1_BUILD_ID);
676 let region2 = received_snapshot.executable_regions.remove(&FAKE_REGION_2_ADDRESS).unwrap();
677 assert_eq!(region2.size, FAKE_REGION_2_SIZE);
678 assert_eq!(region2.file_offset, FAKE_REGION_2_FILE_OFFSET);
679 assert_eq!(region2.build_id, FAKE_REGION_2_BUILD_ID);
680 assert!(received_snapshot.executable_regions.is_empty(), "all entries have been removed");
681 }
682
683 #[test_case(|allocation| allocation.size = None => matches
684 Err(Error::MissingField { container: "Allocation", field: "size" }) ; "size")]
685 #[test_case(|allocation| allocation.stack_trace_key = None => matches
686 Err(Error::MissingField { container: "Allocation", field: "stack_trace_key" }) ; "stack_trace_key")]
687 #[test_case(|allocation| allocation.address = None => matches
688 Ok(_) ; "address_is_optional")]
689 #[test_case(|allocation| allocation.thread_info_key = None => matches
690 Ok(_) ; "thread_info_is_optional")]
691 #[test_case(|allocation| allocation.timestamp = None => matches
692 Ok(_) ; "timestamp_is_optional")]
693 #[test_case(|_| () => matches
694 Ok(_) ; "success")]
695 #[fasync::run_singlethreaded(test)]
696 async fn test_allocation_required_fields(
697 set_one_field_to_none: fn(&mut fheapdump_client::Allocation),
698 ) -> Result<Snapshot, Error> {
699 let client = create_client();
700 let (receiver_proxy, receiver_stream) =
701 client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
702 let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
703
704 let mut allocation = fheapdump_client::Allocation {
706 address: Some(FAKE_ALLOCATION_1_ADDRESS),
707 size: Some(FAKE_ALLOCATION_1_SIZE),
708 thread_info_key: Some(FAKE_THREAD_1_KEY),
709 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
710 timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
711 ..Default::default()
712 };
713
714 set_one_field_to_none(&mut allocation);
716
717 let fut = receiver_proxy.batch(&[
719 fheapdump_client::SnapshotElement::Allocation(allocation),
720 fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
721 thread_info_key: Some(FAKE_THREAD_1_KEY),
722 koid: Some(FAKE_THREAD_1_KOID),
723 name: Some(FAKE_THREAD_1_NAME.to_string()),
724 ..Default::default()
725 }),
726 fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
727 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
728 program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
729 ..Default::default()
730 }),
731 ]);
732 let _ = fut.await; let fut = receiver_proxy.batch(&[]);
736 let _ = fut.await; receive_worker.await
740 }
741
742 #[test_case(|thread_info| thread_info.thread_info_key = None => matches
743 Err(Error::MissingField { container: "ThreadInfo", field: "thread_info_key" }) ; "thread_info_key")]
744 #[test_case(|thread_info| thread_info.koid = None => matches
745 Err(Error::MissingField { container: "ThreadInfo", field: "koid" }) ; "koid")]
746 #[test_case(|thread_info| thread_info.name = None => matches
747 Err(Error::MissingField { container: "ThreadInfo", field: "name" }) ; "name")]
748 #[test_case(|_| () => matches
749 Ok(_) ; "success")]
750 #[fasync::run_singlethreaded(test)]
751 async fn test_thread_info_required_fields(
752 set_one_field_to_none: fn(&mut fheapdump_client::ThreadInfo),
753 ) -> Result<Snapshot, Error> {
754 let client = create_client();
755 let (receiver_proxy, receiver_stream) =
756 client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
757 let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
758
759 let mut thread_info = fheapdump_client::ThreadInfo {
761 thread_info_key: Some(FAKE_THREAD_1_KEY),
762 koid: Some(FAKE_THREAD_1_KOID),
763 name: Some(FAKE_THREAD_1_NAME.to_string()),
764 ..Default::default()
765 };
766
767 set_one_field_to_none(&mut thread_info);
769
770 let fut =
772 receiver_proxy.batch(&[fheapdump_client::SnapshotElement::ThreadInfo(thread_info)]);
773 let _ = fut.await; let fut = receiver_proxy.batch(&[]);
777 let _ = fut.await; receive_worker.await
781 }
782
783 #[test_case(|stack_trace| stack_trace.stack_trace_key = None => matches
784 Err(Error::MissingField { container: "StackTrace", field: "stack_trace_key" }) ; "stack_trace_key")]
785 #[test_case(|stack_trace| stack_trace.program_addresses = None => matches
786 Err(Error::MissingField { container: "StackTrace", field: "program_addresses" }) ; "program_addresses")]
787 #[test_case(|_| () => matches
788 Ok(_) ; "success")]
789 #[fasync::run_singlethreaded(test)]
790 async fn test_stack_trace_required_fields(
791 set_one_field_to_none: fn(&mut fheapdump_client::StackTrace),
792 ) -> Result<Snapshot, Error> {
793 let client = create_client();
794 let (receiver_proxy, receiver_stream) =
795 client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
796 let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
797
798 let mut stack_trace = fheapdump_client::StackTrace {
800 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
801 program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
802 ..Default::default()
803 };
804
805 set_one_field_to_none(&mut stack_trace);
807
808 let fut =
810 receiver_proxy.batch(&[fheapdump_client::SnapshotElement::StackTrace(stack_trace)]);
811 let _ = fut.await; let fut = receiver_proxy.batch(&[]);
815 let _ = fut.await; receive_worker.await
819 }
820
821 #[test_case(|region| region.address = None => matches
822 Err(Error::MissingField { container: "ExecutableRegion", field: "address" }) ; "address")]
823 #[test_case(|region| region.size = None => matches
824 Err(Error::MissingField { container: "ExecutableRegion", field: "size" }) ; "size")]
825 #[test_case(|region| region.file_offset = None => matches
826 Err(Error::MissingField { container: "ExecutableRegion", field: "file_offset" }) ; "file_offset")]
827 #[test_case(|region| region.build_id = None => matches
828 Err(Error::MissingField { container: "ExecutableRegion", field: "build_id" }) ; "build_id")]
829 #[test_case(|_| () => matches
830 Ok(_) ; "success")]
831 #[fasync::run_singlethreaded(test)]
832 async fn test_executable_region_required_fields(
833 set_one_field_to_none: fn(&mut fheapdump_client::ExecutableRegion),
834 ) -> Result<Snapshot, Error> {
835 let client = create_client();
836 let (receiver_proxy, receiver_stream) =
837 client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
838 let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
839
840 let mut region = fheapdump_client::ExecutableRegion {
842 address: Some(FAKE_REGION_1_ADDRESS),
843 size: Some(FAKE_REGION_1_SIZE),
844 file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
845 build_id: Some(fheapdump_client::BuildId { value: FAKE_REGION_1_BUILD_ID.to_vec() }),
846 ..Default::default()
847 };
848
849 set_one_field_to_none(&mut region);
851
852 let fut =
854 receiver_proxy.batch(&[fheapdump_client::SnapshotElement::ExecutableRegion(region)]);
855 let _ = fut.await; let fut = receiver_proxy.batch(&[]);
859 let _ = fut.await; receive_worker.await
863 }
864
865 #[test_case(|block_contents| block_contents.address = None => matches
866 Err(Error::MissingField { container: "BlockContents", field: "address" }) ; "address")]
867 #[test_case(|block_contents| block_contents.contents = None => matches
868 Err(Error::MissingField { container: "BlockContents", field: "contents" }) ; "contents")]
869 #[test_case(|_| () => matches
870 Ok(_) ; "success")]
871 #[fasync::run_singlethreaded(test)]
872 async fn test_block_contents_required_fields(
873 set_one_field_to_none: fn(&mut fheapdump_client::BlockContents),
874 ) -> Result<Snapshot, Error> {
875 let client = create_client();
876 let (receiver_proxy, receiver_stream) =
877 client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
878 let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
879
880 let mut block_contents = fheapdump_client::BlockContents {
882 address: Some(FAKE_ALLOCATION_1_ADDRESS),
883 contents: Some(FAKE_ALLOCATION_1_CONTENTS.to_vec()),
884 ..Default::default()
885 };
886
887 set_one_field_to_none(&mut block_contents);
889
890 let fut = receiver_proxy.batch(&[
892 fheapdump_client::SnapshotElement::BlockContents(block_contents),
893 fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
894 address: Some(FAKE_ALLOCATION_1_ADDRESS),
895 size: Some(FAKE_ALLOCATION_1_SIZE),
896 thread_info_key: Some(FAKE_THREAD_1_KEY),
897 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
898 timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
899 ..Default::default()
900 }),
901 fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
902 thread_info_key: Some(FAKE_THREAD_1_KEY),
903 koid: Some(FAKE_THREAD_1_KOID),
904 name: Some(FAKE_THREAD_1_NAME.to_string()),
905 ..Default::default()
906 }),
907 fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
908 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
909 program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
910 ..Default::default()
911 }),
912 ]);
913 let _ = fut.await; let fut = receiver_proxy.batch(&[]);
917 let _ = fut.await; receive_worker.await
921 }
922
923 #[fasync::run_singlethreaded(test)]
924 async fn test_conflicting_allocations() {
925 let client = create_client();
926 let (receiver_proxy, receiver_stream) =
927 client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
928 let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
929
930 let fut = receiver_proxy.batch(&[
932 fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
933 address: Some(FAKE_ALLOCATION_1_ADDRESS),
934 size: Some(FAKE_ALLOCATION_1_SIZE),
935 thread_info_key: Some(FAKE_THREAD_1_KEY),
936 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
937 timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
938 ..Default::default()
939 }),
940 fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
941 address: Some(FAKE_ALLOCATION_1_ADDRESS),
942 size: Some(FAKE_ALLOCATION_1_SIZE),
943 thread_info_key: Some(FAKE_THREAD_1_KEY),
944 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
945 timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
946 ..Default::default()
947 }),
948 fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
949 thread_info_key: Some(FAKE_THREAD_1_KEY),
950 koid: Some(FAKE_THREAD_1_KOID),
951 name: Some(FAKE_THREAD_1_NAME.to_string()),
952 ..Default::default()
953 }),
954 fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
955 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
956 program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
957 ..Default::default()
958 }),
959 ]);
960 let _ = fut.await; let fut = receiver_proxy.batch(&[]);
964 let _ = fut.await; assert_matches!(
968 receive_worker.await,
969 Err(Error::ConflictingElement { element_type: "Allocation" })
970 );
971 }
972
973 #[fasync::run_singlethreaded(test)]
974 async fn test_conflicting_executable_regions() {
975 let client = create_client();
976 let (receiver_proxy, receiver_stream) =
977 client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
978 let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
979
980 let fut = receiver_proxy.batch(&[
982 fheapdump_client::SnapshotElement::ExecutableRegion(
983 fheapdump_client::ExecutableRegion {
984 address: Some(FAKE_REGION_1_ADDRESS),
985 size: Some(FAKE_REGION_1_SIZE),
986 file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
987 build_id: Some(fheapdump_client::BuildId {
988 value: FAKE_REGION_1_BUILD_ID.to_vec(),
989 }),
990 ..Default::default()
991 },
992 ),
993 fheapdump_client::SnapshotElement::ExecutableRegion(
994 fheapdump_client::ExecutableRegion {
995 address: Some(FAKE_REGION_1_ADDRESS),
996 size: Some(FAKE_REGION_1_SIZE),
997 file_offset: Some(FAKE_REGION_1_FILE_OFFSET),
998 build_id: Some(fheapdump_client::BuildId {
999 value: FAKE_REGION_1_BUILD_ID.to_vec(),
1000 }),
1001 ..Default::default()
1002 },
1003 ),
1004 ]);
1005 let _ = fut.await; let fut = receiver_proxy.batch(&[]);
1009 let _ = fut.await; assert_matches!(
1013 receive_worker.await,
1014 Err(Error::ConflictingElement { element_type: "ExecutableRegion" })
1015 );
1016 }
1017
1018 #[fasync::run_singlethreaded(test)]
1019 async fn test_block_contents_wrong_size() {
1020 let client = create_client();
1021 let (receiver_proxy, receiver_stream) =
1022 client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1023 let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
1024
1025 let contents_with_wrong_size = vec![0; FAKE_ALLOCATION_1_SIZE as usize + 1];
1027 let fut = receiver_proxy.batch(&[
1028 fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1029 address: Some(FAKE_ALLOCATION_1_ADDRESS),
1030 size: Some(FAKE_ALLOCATION_1_SIZE),
1031 thread_info_key: Some(FAKE_THREAD_1_KEY),
1032 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1033 timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1034 ..Default::default()
1035 }),
1036 fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
1037 address: Some(FAKE_ALLOCATION_1_ADDRESS),
1038 contents: Some(contents_with_wrong_size),
1039 ..Default::default()
1040 }),
1041 fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1042 thread_info_key: Some(FAKE_THREAD_1_KEY),
1043 koid: Some(FAKE_THREAD_1_KOID),
1044 name: Some(FAKE_THREAD_1_NAME.to_string()),
1045 ..Default::default()
1046 }),
1047 fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1048 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1049 program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1050 ..Default::default()
1051 }),
1052 ]);
1053 let _ = fut.await; let fut = receiver_proxy.batch(&[]);
1057 let _ = fut.await; assert_matches!(
1061 receive_worker.await,
1062 Err(Error::ConflictingElement { element_type: "BlockContents" })
1063 );
1064 }
1065
1066 #[fasync::run_singlethreaded(test)]
1067 async fn test_empty_stack_trace() {
1068 let client = create_client();
1069 let (receiver_proxy, receiver_stream) =
1070 client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1071 let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
1072
1073 let fut = receiver_proxy.batch(&[
1075 fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1076 address: Some(FAKE_ALLOCATION_1_ADDRESS),
1077 size: Some(FAKE_ALLOCATION_1_SIZE),
1078 thread_info_key: Some(FAKE_THREAD_1_KEY),
1079 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1080 timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1081 ..Default::default()
1082 }),
1083 fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1084 thread_info_key: Some(FAKE_THREAD_1_KEY),
1085 koid: Some(FAKE_THREAD_1_KOID),
1086 name: Some(FAKE_THREAD_1_NAME.to_string()),
1087 ..Default::default()
1088 }),
1089 fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1090 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1091 program_addresses: Some(vec![]),
1092 ..Default::default()
1093 }),
1094 ]);
1095 fut.await.unwrap();
1096
1097 let fut = receiver_proxy.batch(&[]);
1099 fut.await.unwrap();
1100
1101 let received_snapshot = receive_worker.await.unwrap();
1103 let allocation1 = received_snapshot
1104 .allocations
1105 .iter()
1106 .find(|a| a.address == Some(FAKE_ALLOCATION_1_ADDRESS))
1107 .unwrap();
1108 assert_eq!(allocation1.stack_trace.program_addresses, []);
1109 }
1110
1111 #[fasync::run_singlethreaded(test)]
1112 async fn test_chunked_stack_trace() {
1113 let client = create_client();
1114 let (receiver_proxy, receiver_stream) =
1115 client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1116 let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
1117
1118 let fut = receiver_proxy.batch(&[
1120 fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1121 address: Some(FAKE_ALLOCATION_1_ADDRESS),
1122 size: Some(FAKE_ALLOCATION_1_SIZE),
1123 thread_info_key: Some(FAKE_THREAD_1_KEY),
1124 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1125 timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1126 ..Default::default()
1127 }),
1128 fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1129 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1130 program_addresses: Some(vec![1111, 2222]),
1131 ..Default::default()
1132 }),
1133 fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1134 thread_info_key: Some(FAKE_THREAD_1_KEY),
1135 koid: Some(FAKE_THREAD_1_KOID),
1136 name: Some(FAKE_THREAD_1_NAME.to_string()),
1137 ..Default::default()
1138 }),
1139 ]);
1140 fut.await.unwrap();
1141
1142 let fut = receiver_proxy.batch(&[fheapdump_client::SnapshotElement::StackTrace(
1144 fheapdump_client::StackTrace {
1145 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1146 program_addresses: Some(vec![3333]),
1147 ..Default::default()
1148 },
1149 )]);
1150 fut.await.unwrap();
1151
1152 let fut = receiver_proxy.batch(&[]);
1154 fut.await.unwrap();
1155
1156 let received_snapshot = receive_worker.await.unwrap();
1158 let allocation1 = received_snapshot
1159 .allocations
1160 .iter()
1161 .find(|alloc| alloc.address == Some(FAKE_ALLOCATION_1_ADDRESS))
1162 .unwrap();
1163 assert_eq!(allocation1.stack_trace.program_addresses, [1111, 2222, 3333]);
1164 }
1165
1166 #[fasync::run_singlethreaded(test)]
1167 async fn test_empty_block_contents() {
1168 let client = create_client();
1169 let (receiver_proxy, receiver_stream) =
1170 client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1171 let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
1172
1173 let fut = receiver_proxy.batch(&[
1175 fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1176 address: Some(FAKE_ALLOCATION_1_ADDRESS),
1177 size: Some(0),
1178 thread_info_key: Some(FAKE_THREAD_1_KEY),
1179 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1180 timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1181 ..Default::default()
1182 }),
1183 fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1184 thread_info_key: Some(FAKE_THREAD_1_KEY),
1185 koid: Some(FAKE_THREAD_1_KOID),
1186 name: Some(FAKE_THREAD_1_NAME.to_string()),
1187 ..Default::default()
1188 }),
1189 fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1190 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1191 program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1192 ..Default::default()
1193 }),
1194 fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
1195 address: Some(FAKE_ALLOCATION_1_ADDRESS),
1196 contents: Some(vec![]),
1197 ..Default::default()
1198 }),
1199 ]);
1200 fut.await.unwrap();
1201
1202 let fut = receiver_proxy.batch(&[]);
1204 fut.await.unwrap();
1205
1206 let received_snapshot = receive_worker.await.unwrap();
1208 let allocation1 = received_snapshot
1209 .allocations
1210 .iter()
1211 .find(|alloc| alloc.address == Some(FAKE_ALLOCATION_1_ADDRESS))
1212 .unwrap();
1213 assert_eq!(allocation1.contents.as_ref().expect("contents must be set"), &vec![]);
1214 }
1215
1216 #[fasync::run_singlethreaded(test)]
1217 async fn test_chunked_block_contents() {
1218 let client = create_client();
1219 let (receiver_proxy, receiver_stream) =
1220 client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1221 let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
1222
1223 let (content_first_chunk, contents_second_chunk) =
1225 FAKE_ALLOCATION_1_CONTENTS.split_at(FAKE_ALLOCATION_1_CONTENTS.len() / 2);
1226
1227 let fut = receiver_proxy.batch(&[
1229 fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1230 address: Some(FAKE_ALLOCATION_1_ADDRESS),
1231 size: Some(FAKE_ALLOCATION_1_SIZE),
1232 thread_info_key: Some(FAKE_THREAD_1_KEY),
1233 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1234 timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1235 ..Default::default()
1236 }),
1237 fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1238 thread_info_key: Some(FAKE_THREAD_1_KEY),
1239 koid: Some(FAKE_THREAD_1_KOID),
1240 name: Some(FAKE_THREAD_1_NAME.to_string()),
1241 ..Default::default()
1242 }),
1243 fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1244 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1245 program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1246 ..Default::default()
1247 }),
1248 fheapdump_client::SnapshotElement::BlockContents(fheapdump_client::BlockContents {
1249 address: Some(FAKE_ALLOCATION_1_ADDRESS),
1250 contents: Some(content_first_chunk.to_vec()),
1251 ..Default::default()
1252 }),
1253 ]);
1254 fut.await.unwrap();
1255
1256 let fut = receiver_proxy.batch(&[fheapdump_client::SnapshotElement::BlockContents(
1258 fheapdump_client::BlockContents {
1259 address: Some(FAKE_ALLOCATION_1_ADDRESS),
1260 contents: Some(contents_second_chunk.to_vec()),
1261 ..Default::default()
1262 },
1263 )]);
1264 fut.await.unwrap();
1265
1266 let fut = receiver_proxy.batch(&[]);
1268 fut.await.unwrap();
1269
1270 let received_snapshot = receive_worker.await.unwrap();
1272 let allocation1 = received_snapshot
1273 .allocations
1274 .iter()
1275 .find(|a| a.address == Some(FAKE_ALLOCATION_1_ADDRESS))
1276 .unwrap();
1277 assert_eq!(allocation1.contents, Some(FAKE_ALLOCATION_1_CONTENTS.to_vec()));
1278 }
1279
1280 #[fasync::run_singlethreaded(test)]
1281 async fn test_missing_end_of_stream() {
1282 let client = create_client();
1283 let (receiver_proxy, receiver_stream) =
1284 client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1285 let receive_worker = fasync::Task::local(Snapshot::receive_single_from(receiver_stream));
1286
1287 let fut = receiver_proxy.batch(&[
1289 fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1290 address: Some(FAKE_ALLOCATION_1_ADDRESS),
1291 size: Some(FAKE_ALLOCATION_1_SIZE),
1292 thread_info_key: Some(FAKE_THREAD_1_KEY),
1293 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1294 timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1295 ..Default::default()
1296 }),
1297 fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1298 thread_info_key: Some(FAKE_THREAD_1_KEY),
1299 koid: Some(FAKE_THREAD_1_KOID),
1300 name: Some(FAKE_THREAD_1_NAME.to_string()),
1301 ..Default::default()
1302 }),
1303 fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1304 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1305 program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1306 ..Default::default()
1307 }),
1308 ]);
1309 fut.await.unwrap();
1310
1311 std::mem::drop(receiver_proxy);
1313
1314 assert_matches!(receive_worker.await, Err(Error::UnexpectedEndOfStream));
1316 }
1317
1318 #[fasync::run_singlethreaded(test)]
1319 async fn test_multi_contents() {
1320 let client = create_client();
1321 let (receiver_proxy, receiver_stream) =
1322 client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1323 let receive_worker = fasync::Task::local(Snapshot::receive_multi_from(receiver_stream));
1324
1325 for koid in [1111, 2222] {
1327 receiver_proxy
1328 .batch(&[fheapdump_client::SnapshotElement::SnapshotHeader(
1329 fheapdump_client::SnapshotHeader {
1330 process_name: Some(format!("test-process-{koid}")),
1331 process_koid: Some(koid),
1332 ..Default::default()
1333 },
1334 )])
1335 .await
1336 .unwrap();
1337
1338 receiver_proxy
1339 .batch(&[
1340 fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1341 address: Some(FAKE_ALLOCATION_1_ADDRESS),
1342 size: Some(FAKE_ALLOCATION_1_SIZE),
1343 thread_info_key: Some(FAKE_THREAD_1_KEY),
1344 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1345 timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1346 ..Default::default()
1347 }),
1348 fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1349 thread_info_key: Some(FAKE_THREAD_1_KEY),
1350 koid: Some(FAKE_THREAD_1_KOID),
1351 name: Some(FAKE_THREAD_1_NAME.to_string()),
1352 ..Default::default()
1353 }),
1354 fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1355 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1356 program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1357 ..Default::default()
1358 }),
1359 ])
1360 .await
1361 .unwrap();
1362
1363 receiver_proxy.batch(&[]).await.unwrap(); }
1365
1366 receiver_proxy.batch(&[]).await.unwrap();
1368
1369 let received_snapshots = receive_worker.await.unwrap();
1371 assert_eq!(received_snapshots.len(), 2);
1372 assert_eq!(received_snapshots[0].process_name, "test-process-1111");
1373 assert_eq!(received_snapshots[0].process_koid, 1111);
1374 assert_eq!(received_snapshots[1].process_name, "test-process-2222");
1375 assert_eq!(received_snapshots[1].process_koid, 2222);
1376 }
1377
1378 #[fasync::run_singlethreaded(test)]
1379 async fn test_multi_missing_end_of_stream() {
1380 let client = create_client();
1381 let (receiver_proxy, receiver_stream) =
1382 client.create_proxy_and_stream::<fheapdump_client::SnapshotReceiverMarker>();
1383 let receive_worker = fasync::Task::local(Snapshot::receive_multi_from(receiver_stream));
1384
1385 for koid in [1111, 2222] {
1387 receiver_proxy
1388 .batch(&[fheapdump_client::SnapshotElement::SnapshotHeader(
1389 fheapdump_client::SnapshotHeader {
1390 process_name: Some("test-process-name".to_string()),
1391 process_koid: Some(koid),
1392 ..Default::default()
1393 },
1394 )])
1395 .await
1396 .unwrap();
1397
1398 receiver_proxy
1399 .batch(&[
1400 fheapdump_client::SnapshotElement::Allocation(fheapdump_client::Allocation {
1401 address: Some(FAKE_ALLOCATION_1_ADDRESS),
1402 size: Some(FAKE_ALLOCATION_1_SIZE),
1403 thread_info_key: Some(FAKE_THREAD_1_KEY),
1404 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1405 timestamp: Some(FAKE_ALLOCATION_1_TIMESTAMP),
1406 ..Default::default()
1407 }),
1408 fheapdump_client::SnapshotElement::ThreadInfo(fheapdump_client::ThreadInfo {
1409 thread_info_key: Some(FAKE_THREAD_1_KEY),
1410 koid: Some(FAKE_THREAD_1_KOID),
1411 name: Some(FAKE_THREAD_1_NAME.to_string()),
1412 ..Default::default()
1413 }),
1414 fheapdump_client::SnapshotElement::StackTrace(fheapdump_client::StackTrace {
1415 stack_trace_key: Some(FAKE_STACK_TRACE_1_KEY),
1416 program_addresses: Some(FAKE_STACK_TRACE_1_ADDRESSES.to_vec()),
1417 ..Default::default()
1418 }),
1419 ])
1420 .await
1421 .unwrap();
1422
1423 receiver_proxy.batch(&[]).await.unwrap(); }
1425
1426 std::mem::drop(receiver_proxy);
1428
1429 assert_matches!(receive_worker.await, Err(Error::UnexpectedEndOfStream));
1431 }
1432}