1use fidl_fuchsia_hardware_audio::*;
6use fuchsia_async as fasync;
7use futures::task::{Context, Poll, Waker};
8use futures::FutureExt;
9use log::debug;
10use std::pin::Pin;
11use zx::{self as zx, HandleBased};
12
13use crate::stream_config::frames_from_duration;
14use crate::types::{AudioSampleFormat, Error, Result};
15
16pub(crate) struct FrameVmo {
21 vmo: zx::Vmo,
24
25 size: usize,
27
28 start_time: Option<fasync::MonotonicInstant>,
32
33 waker: Option<Waker>,
36
37 timer: Option<Pin<Box<fasync::Timer>>>,
39
40 frames_per_second: u32,
42
43 format: Option<AudioSampleFormat>,
45
46 bytes_per_frame: usize,
48
49 frames_between_notifications: usize,
51
52 position_responder: Option<RingBufferWatchClockRecoveryPositionInfoResponder>,
56
57 next_notify_frame: usize,
60}
61
62impl FrameVmo {
63 pub(crate) fn new() -> Result<FrameVmo> {
64 Ok(FrameVmo {
65 vmo: zx::Vmo::create(0).map_err(|e| Error::IOError(e))?,
66 size: 0,
67 start_time: None,
68 waker: None,
69 frames_per_second: 0,
70 format: None,
71 timer: None,
72 bytes_per_frame: 0,
73 frames_between_notifications: 0,
74 position_responder: None,
75 next_notify_frame: usize::MAX,
76 })
77 }
78
79 pub(crate) fn set_format(
82 &mut self,
83 frames_per_second: u32,
84 format: AudioSampleFormat,
85 channels: u16,
86 frames: usize,
87 notifications_per_ring: u32,
88 ) -> Result<zx::Vmo> {
89 if self.start_time.is_some() {
90 return Err(Error::InvalidState);
91 }
92 let bytes_per_frame = format.compute_frame_size(channels as usize)?;
93 let new_size = bytes_per_frame * frames;
94 self.vmo = zx::Vmo::create(new_size as u64).map_err(|e| Error::IOError(e))?;
95 self.bytes_per_frame = bytes_per_frame;
96 self.size = new_size;
97 self.format = Some(format);
98 self.frames_per_second = frames_per_second;
99 if notifications_per_ring > 0 {
100 self.frames_between_notifications = frames / notifications_per_ring as usize;
104 }
105 Ok(self.vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).map_err(|e| Error::IOError(e))?)
106 }
107
108 pub(crate) fn set_position_responder(
109 &mut self,
110 position_responder: RingBufferWatchClockRecoveryPositionInfoResponder,
111 ) {
112 self.position_responder = Some(position_responder);
113 }
114
115 pub(crate) fn start(&mut self, time: fasync::MonotonicInstant) -> Result<()> {
118 if self.start_time.is_some() || self.format.is_none() {
119 return Err(Error::InvalidState);
120 }
121 self.start_time = Some(time);
122 if self.frames_between_notifications > 0 {
123 self.next_notify_frame = 0;
124 }
125 if let Some(w) = self.waker.take() {
126 debug!("ringing the waker to start");
127 w.wake();
128 }
129 Ok(())
130 }
131
132 pub(crate) fn stop(&mut self) -> Result<bool> {
135 if self.format.is_none() {
136 return Err(Error::InvalidState);
137 }
138 let start_time = self.start_time.take();
139 Ok(start_time.is_some())
140 }
141
142 fn reset_frame_timer(
147 &mut self,
148 deadline: fasync::MonotonicInstant,
149 cx: &mut Context<'_>,
150 ) -> Poll<()> {
151 let mut timer = Box::pin(fasync::Timer::new(deadline));
152 let Poll::Pending = timer.poll_unpin(cx) else {
153 return Poll::Ready(());
154 };
155 self.timer = Some(timer);
156 Poll::Pending
157 }
158
159 pub(crate) fn bytecount_frames(&self, num: usize) -> Option<usize> {
162 (self.bytes_per_frame != 0).then_some(self.bytes_per_frame * num)
163 }
164
165 fn poll_start(&mut self, cx: &mut Context<'_>) -> Poll<fasync::MonotonicInstant> {
166 match self.start_time.as_ref() {
167 None => {
168 self.waker = Some(cx.waker().clone());
169 Poll::Pending
170 }
171 Some(time) => Poll::Ready(*time),
172 }
173 }
174
175 fn frames(&self) -> usize {
178 self.size / self.bytes_per_frame
179 }
180
181 fn frame_idx(&self, frame_number: usize) -> usize {
184 frame_number % self.frames()
185 }
186
187 fn safe_frames_at(&self, time: fasync::MonotonicInstant) -> (usize, usize) {
193 let youngest_frame_count = self.frames_before(time);
194 let oldest_frame_count =
195 youngest_frame_count.saturating_sub(self.size / self.bytes_per_frame);
196 (oldest_frame_count, youngest_frame_count)
197 }
198
199 fn notify_position_responder(&mut self, frame: usize) {
202 if frame > self.next_notify_frame && self.position_responder.is_some() {
203 let start_time = self.start_time.unwrap();
204 let end_frame_time = start_time + self.duration_from_frames(frame);
205 let notify_frame_bytes = self.frame_idx(frame) * self.bytes_per_frame;
206 let info = RingBufferPositionInfo {
207 timestamp: end_frame_time.into_nanos(),
208 position: notify_frame_bytes as u32,
209 };
210 if let Some(Ok(())) = self.position_responder.take().map(|t| t.send(&info)) {
211 self.next_notify_frame += self.frames_between_notifications;
212 }
213 }
214 }
215
216 fn len_in_frames(&self, buf: &[u8]) -> Result<usize> {
219 match buf.len().checked_div(self.bytes_per_frame) {
220 None => Err(Error::InvalidState),
221 Some(0) => Err(Error::InvalidArgs),
222 Some(_) if (buf.len() % self.bytes_per_frame) != 0 => Err(Error::InvalidArgs),
223 Some(len) => Ok(len),
224 }
225 }
226
227 pub(crate) fn poll_write(
246 &mut self,
247 mut next_frame: usize,
248 mut buf: &[u8],
249 cx: &mut Context<'_>,
250 ) -> Poll<Result<(usize, usize)>> {
251 let start_time = futures::ready!(self.poll_start(cx));
252 let count = self.len_in_frames(buf)?;
253 let total_vmo_frames = self.frames();
254 if count > total_vmo_frames || count == 0 {
255 return Poll::Ready(Err(Error::InvalidArgs));
256 }
257
258 let frame_until = next_frame + count;
259
260 let now = fasync::MonotonicInstant::now();
261 let (_oldest_frame_count, current_frame_count) = self.safe_frames_at(now);
262 if (current_frame_count + total_vmo_frames) < frame_until {
264 let deadline = start_time + self.duration_from_frames(frame_until - total_vmo_frames);
265 let () = futures::ready!(self.reset_frame_timer(deadline, cx));
266 return self.poll_write(next_frame, buf, cx);
268 }
269
270 let missing_frames = current_frame_count.saturating_sub(next_frame);
271
272 if missing_frames >= count {
273 self.notify_position_responder(frame_until);
276 return Poll::Ready(Ok((frame_until, count)));
277 } else if missing_frames > 0 {
278 next_frame = current_frame_count;
279 (_, buf) = buf.split_at(missing_frames * self.bytes_per_frame);
280 assert_eq!(frame_until - next_frame, self.len_in_frames(buf).unwrap());
281 }
282
283 let mut frame_from_idx = self.frame_idx(next_frame);
284 let frame_until_idx = self.frame_idx(frame_until);
285 let mut ndx = 0;
286
287 if frame_from_idx >= frame_until_idx {
289 let frames_to_write = total_vmo_frames - frame_from_idx;
290 let bytes_to_write = frames_to_write * self.bytes_per_frame;
291 let byte_start = frame_from_idx * self.bytes_per_frame;
292 self.vmo
293 .write(&buf[0..bytes_to_write], byte_start as u64)
294 .map_err(|e| Error::IOError(e))?;
295 frame_from_idx = 0;
296 ndx = bytes_to_write;
297 }
298
299 let byte_start = frame_from_idx * self.bytes_per_frame;
300
301 self.vmo.write(&buf[ndx..], byte_start as u64).map_err(|e| Error::IOError(e))?;
302
303 self.notify_position_responder(frame_until);
306 Poll::Ready(Ok((frame_until, missing_frames)))
307 }
308
309 pub(crate) fn poll_read(
324 &mut self,
325 mut next_frame: usize,
326 buf: &mut [u8],
327 cx: &mut Context<'_>,
328 ) -> Poll<Result<(usize, usize)>> {
329 let _start_time = futures::ready!(self.poll_start(cx));
330 let count = self.len_in_frames(buf)?;
331 let total_vmo_frames = self.frames();
332 if count > total_vmo_frames || count == 0 {
333 return Poll::Ready(Err(Error::InvalidArgs));
334 }
335
336 let now = fasync::MonotonicInstant::now();
337 let (oldest_frame_count, current_frame_count) = self.safe_frames_at(now);
338
339 let missing_frames = oldest_frame_count.saturating_sub(next_frame);
340
341 if missing_frames > 0 {
342 next_frame = oldest_frame_count;
343 }
344
345 let frames_available = current_frame_count.saturating_sub(next_frame);
346 if frames_available < count {
347 let deadline = now + self.duration_from_frames(count - frames_available);
349 let () = futures::ready!(self.reset_frame_timer(deadline, cx));
350 return self.poll_read(next_frame, buf, cx);
352 }
353
354 let mut frame_from_idx = self.frame_idx(next_frame);
355 let frame_until = next_frame + count;
356 let frame_until_idx = self.frame_idx(frame_until);
357 let mut ndx = 0;
358
359 if frame_from_idx >= frame_until_idx {
361 let frames_to_read = total_vmo_frames - frame_from_idx;
362 let bytes_to_read = frames_to_read * self.bytes_per_frame;
363 let byte_start = frame_from_idx * self.bytes_per_frame;
364 self.vmo
365 .read(&mut buf[0..bytes_to_read], byte_start as u64)
366 .map_err(|e| Error::IOError(e))?;
367 frame_from_idx = 0;
368 ndx = bytes_to_read;
369 }
370
371 let byte_start = frame_from_idx * self.bytes_per_frame;
372
373 self.vmo.read(&mut buf[ndx..], byte_start as u64).map_err(|e| Error::IOError(e))?;
374
375 self.notify_position_responder(frame_until);
378 Poll::Ready(Ok((frame_until, missing_frames)))
379 }
380
381 fn frames_before(&self, time: fasync::MonotonicInstant) -> usize {
383 match self.start_time.as_ref() {
384 Some(start) if time > *start => self.frames_from_duration(time - *start) as usize,
385 _ => 0,
386 }
387 }
388
389 fn frames_from_duration(&self, duration: fasync::MonotonicDuration) -> usize {
391 frames_from_duration(self.frames_per_second as usize, duration)
392 }
393
394 fn duration_from_frames(&self, frames: usize) -> fasync::MonotonicDuration {
399 let fps = self.frames_per_second as i64;
400 let secs = frames as i64 / fps;
401 let leftover_frames = frames as i64 % fps;
402 let nanos = (leftover_frames * 1_000_000_000i64) / fps;
403 let roundup_nano = if 0 != ((leftover_frames * 1_000_000_000i64) % fps) { 1 } else { 0 };
404 zx::MonotonicDuration::from_seconds(secs)
405 + zx::MonotonicDuration::from_nanos(nanos + roundup_nano)
406 }
407}
408
409#[cfg(test)]
410mod tests {
411 use super::*;
412
413 use async_utils::PollExt;
414 use fixture::fixture;
415
416 const TEST_FORMAT: AudioSampleFormat = AudioSampleFormat::Eight { unsigned: false };
418 const TEST_CHANNELS: u16 = 1;
419 const TEST_FPS: u32 = 48000;
420 const TEST_FRAMES: usize = TEST_FPS as usize / 2;
421 const TEST_VMO_DURATION: fasync::MonotonicDuration =
423 fasync::MonotonicDuration::from_millis(500);
424
425 const ONE_FRAME_NANOS: i64 = 20833 + 1;
428 const TWO_FRAME_NANOS: i64 = 20833 * 2 + 1;
429 const THREE_FRAME_NANOS: i64 = 20833 * 3 + 1;
430
431 fn get_test_vmo(frames: usize) -> FrameVmo {
432 let mut vmo = FrameVmo::new().expect("can't make a framevmo");
433 let _handle = vmo.set_format(TEST_FPS, TEST_FORMAT, TEST_CHANNELS, frames, 0).unwrap();
434 vmo
435 }
436
437 fn with_test_vmo<F>(_name: &str, test: F)
438 where
439 F: FnOnce(FrameVmo) -> (),
440 {
441 test(get_test_vmo(TEST_FRAMES))
442 }
443
444 #[fixture(with_test_vmo)]
445 #[fuchsia::test]
446 fn duration_from_frames(vmo: FrameVmo) {
447 assert_eq!(zx::MonotonicDuration::from_nanos(ONE_FRAME_NANOS), vmo.duration_from_frames(1));
448 assert_eq!(zx::MonotonicDuration::from_nanos(TWO_FRAME_NANOS), vmo.duration_from_frames(2));
449 assert_eq!(
450 zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS),
451 vmo.duration_from_frames(3)
452 );
453
454 assert_eq!(
455 zx::MonotonicDuration::from_seconds(1),
456 vmo.duration_from_frames(TEST_FPS as usize)
457 );
458
459 assert_eq!(zx::MonotonicDuration::from_millis(1500), vmo.duration_from_frames(72000));
460 }
461
462 #[fixture(with_test_vmo)]
463 #[fuchsia::test]
464 fn frames_from_duration(vmo: FrameVmo) {
465 assert_eq!(0, vmo.frames_from_duration(zx::MonotonicDuration::from_nanos(0)));
466
467 assert_eq!(
468 0,
469 vmo.frames_from_duration(zx::MonotonicDuration::from_nanos(ONE_FRAME_NANOS - 1))
470 );
471 assert_eq!(1, vmo.frames_from_duration(zx::MonotonicDuration::from_nanos(ONE_FRAME_NANOS)));
472
473 assert_eq!(
475 2,
476 vmo.frames_from_duration(zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS - 1))
477 );
478 assert_eq!(
479 3,
480 vmo.frames_from_duration(zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS))
481 );
482 assert_eq!(
483 3,
484 vmo.frames_from_duration(zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS + 1))
485 );
486
487 assert_eq!(
488 TEST_FPS as usize,
489 vmo.frames_from_duration(zx::MonotonicDuration::from_seconds(1))
490 );
491 assert_eq!(72000, vmo.frames_from_duration(zx::MonotonicDuration::from_millis(1500)));
492
493 assert_eq!(10660, vmo.frames_from_duration(zx::MonotonicDuration::from_nanos(222084000)));
494 }
495
496 #[fuchsia::test]
497 fn start_stop() {
498 let _exec = fasync::TestExecutor::new();
499
500 let mut vmo = FrameVmo::new().expect("can't make a framevmo");
501
502 assert!(vmo.start(fasync::MonotonicInstant::now()).is_err());
504 assert!(vmo.stop().is_err());
506
507 let _handle = vmo.set_format(TEST_FPS, TEST_FORMAT, TEST_CHANNELS, TEST_FRAMES, 0).unwrap();
508
509 let start_time = fasync::MonotonicInstant::now();
510 vmo.start(start_time).unwrap();
511 match vmo.start(start_time) {
512 Err(Error::InvalidState) => {}
513 x => panic!("Expected Err(InvalidState) from double start but got {:?}", x),
514 };
515
516 assert_eq!(true, vmo.stop().unwrap());
517 assert_eq!(false, vmo.stop().unwrap());
519
520 vmo.start(start_time).unwrap();
521 }
522
523 fn frames_before_exact(
524 vmo: &mut FrameVmo,
525 time_nanos: i64,
526 duration: zx::MonotonicDuration,
527 frames: usize,
528 ) {
529 let _ = vmo.stop();
530 let start_time = fasync::MonotonicInstant::from_nanos(time_nanos);
531 vmo.start(start_time).unwrap();
532 assert_eq!(frames, vmo.frames_before(start_time + duration));
533 }
534
535 #[fixture(with_test_vmo)]
536 #[fuchsia::test]
537 fn frames_before(mut vmo: FrameVmo) {
538 let _exec = fasync::TestExecutor::new();
539
540 let start_time = fasync::MonotonicInstant::now();
541 vmo.start(start_time).unwrap();
542
543 assert_eq!(0, vmo.frames_before(start_time));
544
545 assert_eq!(
546 1,
547 vmo.frames_before(start_time + zx::MonotonicDuration::from_nanos(ONE_FRAME_NANOS))
548 );
549 assert_eq!(
550 2,
551 vmo.frames_before(
552 start_time + zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS - 1)
553 )
554 );
555 assert_eq!(
556 3,
557 vmo.frames_before(start_time + zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS))
558 );
559
560 assert_eq!(
561 TEST_FPS as usize / 4,
562 vmo.frames_before(start_time + zx::MonotonicDuration::from_millis(250))
563 );
564
565 let three_quarters_dur = zx::MonotonicDuration::from_millis(375);
566 assert_eq!(3 * TEST_FPS as usize / 8, vmo.frames_before(start_time + three_quarters_dur));
567
568 assert_eq!(
569 10521,
570 vmo.frames_before(start_time + zx::MonotonicDuration::from_nanos(219188000))
571 );
572
573 frames_before_exact(
574 &mut vmo,
575 273533747037,
576 zx::MonotonicDuration::from_nanos(219188000),
577 10521,
578 );
579 frames_before_exact(
580 &mut vmo,
581 714329925362,
582 zx::MonotonicDuration::from_nanos(219292000),
583 10526,
584 );
585 }
586
587 #[fixture(with_test_vmo)]
588 #[fuchsia::test]
589 fn poll_read(mut vmo: FrameVmo) {
590 let exec = fasync::TestExecutor::new_with_fake_time();
591 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(1_000_000_000));
592
593 let start_time = fasync::MonotonicInstant::now();
594 vmo.start(start_time).unwrap();
595
596 let half_dur = TEST_VMO_DURATION / 2;
597 const QUART_FRAMES: usize = TEST_FRAMES / 4;
598 let mut quart_frames_buf = [0; QUART_FRAMES];
599 exec.set_fake_time(fasync::MonotonicInstant::after(half_dur));
600
601 let mut no_wake_cx = Context::from_waker(futures_test::task::panic_waker_ref());
602
603 let res = vmo.poll_read(0, &mut quart_frames_buf, &mut no_wake_cx);
604 let (frame_idx, missed) = res.expect("frames should be ready").expect("no error");
605
606 assert_eq!(0, missed);
607 assert_eq!(frame_idx, QUART_FRAMES);
610
611 exec.set_fake_time(fasync::MonotonicInstant::after(half_dur));
613
614 let mut full_buf = [0; TEST_FRAMES];
615 let res = vmo.poll_read(0, &mut full_buf, &mut no_wake_cx);
616 let (frame_idx, missed) = res.expect("frames should be ready").expect("no error");
617
618 assert_eq!(0, missed);
619 assert_eq!(frame_idx, TEST_FRAMES);
621
622 exec.set_fake_time(fasync::MonotonicInstant::after(half_dur));
626
627 let res = vmo.poll_read(TEST_FRAMES / 2, &mut full_buf, &mut no_wake_cx);
631 let (frame_idx, missed) = res.expect("frames should be ready").expect("no error");
632
633 assert_eq!(0, missed);
634 assert_eq!(frame_idx, TEST_FRAMES + TEST_FRAMES / 2);
635
636 let res = vmo.poll_read(frame_idx - QUART_FRAMES, &mut quart_frames_buf, &mut no_wake_cx);
641 let (frame_idx, missed) = res.expect("frames should be ready").expect("no error");
642
643 assert_eq!(0, missed);
644 assert_eq!(frame_idx, TEST_FRAMES + TEST_FRAMES / 2);
645 }
646
647 #[fixture(with_test_vmo)]
648 #[fuchsia::test]
649 fn poll_read_waking(mut vmo: FrameVmo) {
650 let mut exec = fasync::TestExecutor::new_with_fake_time();
651 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(1_000_000_000));
652
653 let half_dur = TEST_VMO_DURATION / 2;
654 const QUART_FRAMES: usize = TEST_FRAMES / 4;
655 let mut quart_frames_buf = [0; QUART_FRAMES];
656 exec.set_fake_time(fasync::MonotonicInstant::after(half_dur));
657
658 let (waker, count) = futures_test::task::new_count_waker();
659 let mut counting_wake_cx = Context::from_waker(&waker);
660
661 let res = vmo.poll_read(0, &mut quart_frames_buf[..], &mut counting_wake_cx);
663 res.expect_pending("should be pending before start");
664
665 let start_time = fasync::MonotonicInstant::now();
666 vmo.start(start_time).unwrap();
667
668 assert_eq!(count, 1);
670
671 let res = vmo.poll_read(0, &mut quart_frames_buf[..], &mut counting_wake_cx);
673 res.expect_pending("should be pending before start");
674
675 let new_time = fasync::MonotonicInstant::after(half_dur);
678 exec.set_fake_time(new_time);
679 assert!(exec.wake_expired_timers());
680
681 assert_eq!(count, 2);
682
683 let res = vmo.poll_read(0, &mut quart_frames_buf[..], &mut counting_wake_cx);
685 let (frame_idx, missed) = res.expect("frames should be ready").expect("no error");
686 assert_eq!(0, missed);
687 assert_eq!(frame_idx, QUART_FRAMES);
688
689 let mut half_frames_buf = [0; TEST_FRAMES / 2];
692 let res = vmo.poll_read(frame_idx, &mut half_frames_buf, &mut counting_wake_cx);
693 res.expect_pending("should be pending because not enough data");
694
695 assert_eq!(count, 2);
696 }
697
698 #[fuchsia::test]
699 fn multibyte_poll_read() {
700 let exec = fasync::TestExecutor::new_with_fake_time();
701 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(1_000_000_000));
702 let mut vmo = FrameVmo::new().expect("can't make a framevmo");
703 let format = AudioSampleFormat::Sixteen { unsigned: false, invert_endian: false };
704 let frames = TEST_FPS as usize / 2;
705
706 assert!(vmo.bytecount_frames(10).is_none());
708
709 let _handle = vmo.set_format(TEST_FPS, format, 2, frames, 0).unwrap();
710
711 let half_dur = zx::MonotonicDuration::from_millis(250);
712
713 let start_time = fasync::MonotonicInstant::now() - half_dur;
715 vmo.start(start_time).unwrap();
716
717 let bytecount =
718 vmo.bytecount_frames(frames / 2).expect("a bytecount should exist after format");
719 let mut half_frames_buf = vec![0; bytecount];
720
721 let mut no_wake_cx = Context::from_waker(futures_test::task::panic_waker_ref());
722 let res = vmo.poll_read(0, half_frames_buf.as_mut_slice(), &mut no_wake_cx);
723 let (idx, missed) = res.expect("frames should be ready").expect("no error");
724
725 assert_eq!(0, missed);
726 assert_eq!(idx, frames / 2);
728 }
729
730 #[fixture(with_test_vmo)]
731 #[fuchsia::test]
732 fn poll_read_boundaries(mut vmo: FrameVmo) {
733 let exec = fasync::TestExecutor::new_with_fake_time();
734 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(1_000_000_000));
735 let mut no_wake_cx = Context::from_waker(futures_test::task::panic_waker_ref());
736
737 let start_time = fasync::MonotonicInstant::now();
738
739 vmo.start(start_time).unwrap();
740
741 exec.set_fake_time(
743 start_time + zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS)
744 - zx::MonotonicDuration::from_nanos(1),
745 );
746 let mut one_frame_buf = [0; 1];
747 vmo.poll_read(2, &mut one_frame_buf, &mut no_wake_cx)
748 .expect_pending("third frame shouldn't be ready");
749 exec.set_fake_time(start_time + zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS));
751 let res = vmo.poll_read(2, &mut one_frame_buf, &mut no_wake_cx);
752 let (idx, missed) = res.expect("third frame should be ready").expect("no error");
753
754 assert_eq!(0, missed);
755 assert_eq!(3, idx);
757
758 let much_later_ns = 3999 * THREE_FRAME_NANOS;
760 let next_frame_idx = 3998 * 3 + 2;
761 exec.set_fake_time(start_time + zx::MonotonicDuration::from_nanos(much_later_ns));
762 let res = vmo.poll_read(next_frame_idx, &mut one_frame_buf, &mut no_wake_cx);
763 let (idx, missed) = res.expect("frame should be ready").expect("no error");
764 assert_eq!(0, missed);
765 assert_eq!(next_frame_idx + 1, idx);
767
768 let mut all_frames_len = 0;
769 let mut total_duration = zx::MonotonicDuration::from_nanos(0);
770
771 let moment_length = zx::MonotonicDuration::from_nanos(10_000);
772
773 let mut moment_start = start_time;
774 let mut moment_end = moment_start;
775 let mut next_index = 0;
776
777 let mut ten_frames_buf = [0; 10];
778
779 while total_duration < zx::MonotonicDuration::from_millis(250) {
780 moment_end += moment_length;
781 exec.set_fake_time(moment_end);
782 total_duration += moment_length;
783 let Poll::Ready(res) = vmo.poll_read(next_index, &mut ten_frames_buf, &mut no_wake_cx)
785 else {
786 continue;
787 };
788 let (last_idx, missed) = res.expect("no error");
789 assert_eq!(0, missed);
790 all_frames_len += ten_frames_buf.len();
791 assert_eq!(
792 all_frames_len,
793 vmo.frames_before(moment_end + zx::MonotonicDuration::from_nanos(1)),
794 "frame miscount after {:?} - {:?} moment",
795 moment_start,
796 moment_end
797 );
798 moment_start = moment_end;
799 next_index = last_idx;
800 }
801
802 assert_eq!(TEST_FRAMES / 2, all_frames_len, "should be a quarter second worth of frames");
803 }
804
805 #[fixture(with_test_vmo)]
806 #[fuchsia::test]
807 fn poll_write(mut vmo: FrameVmo) {
808 let exec = fasync::TestExecutor::new_with_fake_time();
809 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(1_000_000_000));
810
811 let start_time = fasync::MonotonicInstant::now();
812 vmo.start(start_time).unwrap();
813
814 let half_dur = TEST_VMO_DURATION / 2;
815 const QUART_FRAMES: usize = TEST_FRAMES / 4;
816 let quart_frames_buf = [0; QUART_FRAMES];
817
818 let mut no_wake_cx = Context::from_waker(futures_test::task::panic_waker_ref());
819
820 let res = vmo.poll_write(0, &quart_frames_buf, &mut no_wake_cx);
822 let (frame_idx, missed) = res.expect("frames should be writable").expect("no error");
823
824 assert_eq!(0, missed);
825 assert_eq!(frame_idx, QUART_FRAMES);
828
829 exec.set_fake_time(fasync::MonotonicInstant::after(half_dur / 2));
833
834 let full_buf = [0; TEST_FRAMES];
835 let res = vmo.poll_write(frame_idx, &full_buf, &mut no_wake_cx);
836 let (frame_idx, missed) = res.expect("frames should be writable").expect("no error");
837
838 assert_eq!(0, missed);
839 assert_eq!(frame_idx, TEST_FRAMES + QUART_FRAMES);
841
842 exec.set_fake_time(fasync::MonotonicInstant::after(TEST_VMO_DURATION));
846
847 let res = vmo.poll_write(frame_idx, &full_buf, &mut no_wake_cx);
851 let (frame_idx, missed) = res.expect("frames should be ready").expect("no error");
852
853 assert_eq!(0, missed);
854 assert_eq!(frame_idx, TEST_FRAMES * 2 + QUART_FRAMES);
855
856 let res = vmo.poll_write(TEST_FRAMES + TEST_FRAMES, &quart_frames_buf, &mut no_wake_cx);
860 let (frame_idx, missed) = res.expect("frames should be writable").expect("no error");
861
862 assert_eq!(0, missed);
863 assert_eq!(frame_idx, TEST_FRAMES + TEST_FRAMES + QUART_FRAMES);
864 }
865
866 #[fixture(with_test_vmo)]
867 #[fuchsia::test]
868 fn poll_write_waking(mut vmo: FrameVmo) {
869 let mut exec = fasync::TestExecutor::new_with_fake_time();
870 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(1_000_000_000));
871
872 let half_dur = TEST_VMO_DURATION / 2;
873 const QUART_FRAMES: usize = TEST_FRAMES / 4;
874 let quart_frames_buf = [0; QUART_FRAMES];
875 exec.set_fake_time(fasync::MonotonicInstant::after(half_dur));
876
877 let (waker, count) = futures_test::task::new_count_waker();
878 let mut counting_wake_cx = Context::from_waker(&waker);
879
880 let res = vmo.poll_write(0, &quart_frames_buf, &mut counting_wake_cx);
882 res.expect_pending("should be pending before start");
883
884 let start_time = fasync::MonotonicInstant::now();
885 vmo.start(start_time).unwrap();
886
887 assert_eq!(count, 1);
889
890 let mut idx = 0;
894 for _ in 0..4 {
895 let res = vmo.poll_write(idx, &quart_frames_buf, &mut counting_wake_cx);
896 let (new_idx, missed) = res.expect("should be writable just after start").expect("ok");
897 assert_eq!(0, missed);
898 assert!(new_idx > idx);
899 idx = new_idx;
900 }
901
902 let res = vmo.poll_write(idx, &quart_frames_buf, &mut counting_wake_cx);
904 res.expect_pending("should have to wait for time to go by");
905
906 let new_time = fasync::MonotonicInstant::after(half_dur);
909 exec.set_fake_time(new_time);
910 assert!(exec.wake_expired_timers());
911
912 assert_eq!(count, 2);
913
914 for _ in 0..2 {
916 let res = vmo.poll_write(idx, &quart_frames_buf, &mut counting_wake_cx);
917 let (new_idx, missed) = res.expect("frames writable").expect("ok");
918 assert_eq!(0, missed);
919 assert!(new_idx >= idx);
920 idx = new_idx;
921 }
922
923 let half_frames_buf = [0; TEST_FRAMES / 2];
926 let res = vmo.poll_write(idx - QUART_FRAMES, &half_frames_buf, &mut counting_wake_cx);
927 res.expect_pending("should be pending because not enough space");
928
929 assert_eq!(count, 2);
930 }
931}