use fidl_fuchsia_hardware_audio::*;
use fuchsia_async as fasync;
use futures::task::{Context, Poll, Waker};
use futures::FutureExt;
use log::debug;
use std::pin::Pin;
use zx::{self as zx, HandleBased};
use crate::stream_config::frames_from_duration;
use crate::types::{AudioSampleFormat, Error, Result};
pub(crate) struct FrameVmo {
vmo: zx::Vmo,
size: usize,
start_time: Option<fasync::MonotonicInstant>,
waker: Option<Waker>,
timer: Option<Pin<Box<fasync::Timer>>>,
frames_per_second: u32,
format: Option<AudioSampleFormat>,
bytes_per_frame: usize,
frames_between_notifications: usize,
position_responder: Option<RingBufferWatchClockRecoveryPositionInfoResponder>,
next_notify_frame: usize,
}
impl FrameVmo {
pub(crate) fn new() -> Result<FrameVmo> {
Ok(FrameVmo {
vmo: zx::Vmo::create(0).map_err(|e| Error::IOError(e))?,
size: 0,
start_time: None,
waker: None,
frames_per_second: 0,
format: None,
timer: None,
bytes_per_frame: 0,
frames_between_notifications: 0,
position_responder: None,
next_notify_frame: usize::MAX,
})
}
pub(crate) fn set_format(
&mut self,
frames_per_second: u32,
format: AudioSampleFormat,
channels: u16,
frames: usize,
notifications_per_ring: u32,
) -> Result<zx::Vmo> {
if self.start_time.is_some() {
return Err(Error::InvalidState);
}
let bytes_per_frame = format.compute_frame_size(channels as usize)?;
let new_size = bytes_per_frame * frames;
self.vmo = zx::Vmo::create(new_size as u64).map_err(|e| Error::IOError(e))?;
self.bytes_per_frame = bytes_per_frame;
self.size = new_size;
self.format = Some(format);
self.frames_per_second = frames_per_second;
if notifications_per_ring > 0 {
self.frames_between_notifications = frames / notifications_per_ring as usize;
}
Ok(self.vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).map_err(|e| Error::IOError(e))?)
}
pub(crate) fn set_position_responder(
&mut self,
position_responder: RingBufferWatchClockRecoveryPositionInfoResponder,
) {
self.position_responder = Some(position_responder);
}
pub(crate) fn start(&mut self, time: fasync::MonotonicInstant) -> Result<()> {
if self.start_time.is_some() || self.format.is_none() {
return Err(Error::InvalidState);
}
self.start_time = Some(time);
if self.frames_between_notifications > 0 {
self.next_notify_frame = 0;
}
if let Some(w) = self.waker.take() {
debug!("ringing the waker to start");
w.wake();
}
Ok(())
}
pub(crate) fn stop(&mut self) -> Result<bool> {
if self.format.is_none() {
return Err(Error::InvalidState);
}
let start_time = self.start_time.take();
Ok(start_time.is_some())
}
fn reset_frame_timer(
&mut self,
deadline: fasync::MonotonicInstant,
cx: &mut Context<'_>,
) -> Poll<()> {
let mut timer = Box::pin(fasync::Timer::new(deadline));
let Poll::Pending = timer.poll_unpin(cx) else {
return Poll::Ready(());
};
self.timer = Some(timer);
Poll::Pending
}
pub(crate) fn bytecount_frames(&self, num: usize) -> Option<usize> {
(self.bytes_per_frame != 0).then_some(self.bytes_per_frame * num)
}
fn poll_start(&mut self, cx: &mut Context<'_>) -> Poll<fasync::MonotonicInstant> {
match self.start_time.as_ref() {
None => {
self.waker = Some(cx.waker().clone());
Poll::Pending
}
Some(time) => Poll::Ready(*time),
}
}
fn frames(&self) -> usize {
self.size / self.bytes_per_frame
}
fn frame_idx(&self, frame_number: usize) -> usize {
frame_number % self.frames()
}
fn safe_frames_at(&self, time: fasync::MonotonicInstant) -> (usize, usize) {
let youngest_frame_count = self.frames_before(time);
let oldest_frame_count =
youngest_frame_count.saturating_sub(self.size / self.bytes_per_frame);
(oldest_frame_count, youngest_frame_count)
}
fn notify_position_responder(&mut self, frame: usize) {
if frame > self.next_notify_frame && self.position_responder.is_some() {
let start_time = self.start_time.unwrap();
let end_frame_time = start_time + self.duration_from_frames(frame);
let notify_frame_bytes = self.frame_idx(frame) * self.bytes_per_frame;
let info = RingBufferPositionInfo {
timestamp: end_frame_time.into_nanos(),
position: notify_frame_bytes as u32,
};
if let Some(Ok(())) = self.position_responder.take().map(|t| t.send(&info)) {
self.next_notify_frame += self.frames_between_notifications;
}
}
}
fn len_in_frames(&self, buf: &[u8]) -> Result<usize> {
match buf.len().checked_div(self.bytes_per_frame) {
None => Err(Error::InvalidState),
Some(0) => Err(Error::InvalidArgs),
Some(_) if (buf.len() % self.bytes_per_frame) != 0 => Err(Error::InvalidArgs),
Some(len) => Ok(len),
}
}
pub(crate) fn poll_write(
&mut self,
mut next_frame: usize,
mut buf: &[u8],
cx: &mut Context<'_>,
) -> Poll<Result<(usize, usize)>> {
let start_time = futures::ready!(self.poll_start(cx));
let count = self.len_in_frames(buf)?;
let total_vmo_frames = self.frames();
if count > total_vmo_frames || count == 0 {
return Poll::Ready(Err(Error::InvalidArgs));
}
let frame_until = next_frame + count;
let now = fasync::MonotonicInstant::now();
let (_oldest_frame_count, current_frame_count) = self.safe_frames_at(now);
if (current_frame_count + total_vmo_frames) < frame_until {
let deadline = start_time + self.duration_from_frames(frame_until - total_vmo_frames);
let () = futures::ready!(self.reset_frame_timer(deadline, cx));
return self.poll_write(next_frame, buf, cx);
}
let missing_frames = current_frame_count.saturating_sub(next_frame);
if missing_frames >= count {
self.notify_position_responder(frame_until);
return Poll::Ready(Ok((frame_until, count)));
} else if missing_frames > 0 {
next_frame = current_frame_count;
(_, buf) = buf.split_at(missing_frames * self.bytes_per_frame);
assert_eq!(frame_until - next_frame, self.len_in_frames(buf).unwrap());
}
let mut frame_from_idx = self.frame_idx(next_frame);
let frame_until_idx = self.frame_idx(frame_until);
let mut ndx = 0;
if frame_from_idx >= frame_until_idx {
let frames_to_write = total_vmo_frames - frame_from_idx;
let bytes_to_write = frames_to_write * self.bytes_per_frame;
let byte_start = frame_from_idx * self.bytes_per_frame;
self.vmo
.write(&buf[0..bytes_to_write], byte_start as u64)
.map_err(|e| Error::IOError(e))?;
frame_from_idx = 0;
ndx = bytes_to_write;
}
let byte_start = frame_from_idx * self.bytes_per_frame;
self.vmo.write(&buf[ndx..], byte_start as u64).map_err(|e| Error::IOError(e))?;
self.notify_position_responder(frame_until);
Poll::Ready(Ok((frame_until, missing_frames)))
}
pub(crate) fn poll_read(
&mut self,
mut next_frame: usize,
buf: &mut [u8],
cx: &mut Context<'_>,
) -> Poll<Result<(usize, usize)>> {
let _start_time = futures::ready!(self.poll_start(cx));
let count = self.len_in_frames(buf)?;
let total_vmo_frames = self.frames();
if count > total_vmo_frames || count == 0 {
return Poll::Ready(Err(Error::InvalidArgs));
}
let now = fasync::MonotonicInstant::now();
let (oldest_frame_count, current_frame_count) = self.safe_frames_at(now);
let missing_frames = oldest_frame_count.saturating_sub(next_frame);
if missing_frames > 0 {
next_frame = oldest_frame_count;
}
let frames_available = current_frame_count.saturating_sub(next_frame);
if frames_available < count {
let deadline = now + self.duration_from_frames(count - frames_available);
let () = futures::ready!(self.reset_frame_timer(deadline, cx));
return self.poll_read(next_frame, buf, cx);
}
let mut frame_from_idx = self.frame_idx(next_frame);
let frame_until = next_frame + count;
let frame_until_idx = self.frame_idx(frame_until);
let mut ndx = 0;
if frame_from_idx >= frame_until_idx {
let frames_to_read = total_vmo_frames - frame_from_idx;
let bytes_to_read = frames_to_read * self.bytes_per_frame;
let byte_start = frame_from_idx * self.bytes_per_frame;
self.vmo
.read(&mut buf[0..bytes_to_read], byte_start as u64)
.map_err(|e| Error::IOError(e))?;
frame_from_idx = 0;
ndx = bytes_to_read;
}
let byte_start = frame_from_idx * self.bytes_per_frame;
self.vmo.read(&mut buf[ndx..], byte_start as u64).map_err(|e| Error::IOError(e))?;
self.notify_position_responder(frame_until);
Poll::Ready(Ok((frame_until, missing_frames)))
}
fn frames_before(&self, time: fasync::MonotonicInstant) -> usize {
match self.start_time.as_ref() {
Some(start) if time > *start => self.frames_from_duration(time - *start) as usize,
_ => 0,
}
}
fn frames_from_duration(&self, duration: fasync::MonotonicDuration) -> usize {
frames_from_duration(self.frames_per_second as usize, duration)
}
fn duration_from_frames(&self, frames: usize) -> fasync::MonotonicDuration {
let fps = self.frames_per_second as i64;
let secs = frames as i64 / fps;
let leftover_frames = frames as i64 % fps;
let nanos = (leftover_frames * 1_000_000_000i64) / fps;
let roundup_nano = if 0 != ((leftover_frames * 1_000_000_000i64) % fps) { 1 } else { 0 };
zx::MonotonicDuration::from_seconds(secs)
+ zx::MonotonicDuration::from_nanos(nanos + roundup_nano)
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_utils::PollExt;
use fixture::fixture;
const TEST_FORMAT: AudioSampleFormat = AudioSampleFormat::Eight { unsigned: false };
const TEST_CHANNELS: u16 = 1;
const TEST_FPS: u32 = 48000;
const TEST_FRAMES: usize = TEST_FPS as usize / 2;
const TEST_VMO_DURATION: fasync::MonotonicDuration =
fasync::MonotonicDuration::from_millis(500);
const ONE_FRAME_NANOS: i64 = 20833 + 1;
const TWO_FRAME_NANOS: i64 = 20833 * 2 + 1;
const THREE_FRAME_NANOS: i64 = 20833 * 3 + 1;
fn get_test_vmo(frames: usize) -> FrameVmo {
let mut vmo = FrameVmo::new().expect("can't make a framevmo");
let _handle = vmo.set_format(TEST_FPS, TEST_FORMAT, TEST_CHANNELS, frames, 0).unwrap();
vmo
}
fn with_test_vmo<F>(_name: &str, test: F)
where
F: FnOnce(FrameVmo) -> (),
{
test(get_test_vmo(TEST_FRAMES))
}
#[fixture(with_test_vmo)]
#[fuchsia::test]
fn duration_from_frames(vmo: FrameVmo) {
assert_eq!(zx::MonotonicDuration::from_nanos(ONE_FRAME_NANOS), vmo.duration_from_frames(1));
assert_eq!(zx::MonotonicDuration::from_nanos(TWO_FRAME_NANOS), vmo.duration_from_frames(2));
assert_eq!(
zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS),
vmo.duration_from_frames(3)
);
assert_eq!(
zx::MonotonicDuration::from_seconds(1),
vmo.duration_from_frames(TEST_FPS as usize)
);
assert_eq!(zx::MonotonicDuration::from_millis(1500), vmo.duration_from_frames(72000));
}
#[fixture(with_test_vmo)]
#[fuchsia::test]
fn frames_from_duration(vmo: FrameVmo) {
assert_eq!(0, vmo.frames_from_duration(zx::MonotonicDuration::from_nanos(0)));
assert_eq!(
0,
vmo.frames_from_duration(zx::MonotonicDuration::from_nanos(ONE_FRAME_NANOS - 1))
);
assert_eq!(1, vmo.frames_from_duration(zx::MonotonicDuration::from_nanos(ONE_FRAME_NANOS)));
assert_eq!(
2,
vmo.frames_from_duration(zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS - 1))
);
assert_eq!(
3,
vmo.frames_from_duration(zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS))
);
assert_eq!(
3,
vmo.frames_from_duration(zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS + 1))
);
assert_eq!(
TEST_FPS as usize,
vmo.frames_from_duration(zx::MonotonicDuration::from_seconds(1))
);
assert_eq!(72000, vmo.frames_from_duration(zx::MonotonicDuration::from_millis(1500)));
assert_eq!(10660, vmo.frames_from_duration(zx::MonotonicDuration::from_nanos(222084000)));
}
#[fuchsia::test]
fn start_stop() {
let _exec = fasync::TestExecutor::new();
let mut vmo = FrameVmo::new().expect("can't make a framevmo");
assert!(vmo.start(fasync::MonotonicInstant::now()).is_err());
assert!(vmo.stop().is_err());
let _handle = vmo.set_format(TEST_FPS, TEST_FORMAT, TEST_CHANNELS, TEST_FRAMES, 0).unwrap();
let start_time = fasync::MonotonicInstant::now();
vmo.start(start_time).unwrap();
match vmo.start(start_time) {
Err(Error::InvalidState) => {}
x => panic!("Expected Err(InvalidState) from double start but got {:?}", x),
};
assert_eq!(true, vmo.stop().unwrap());
assert_eq!(false, vmo.stop().unwrap());
vmo.start(start_time).unwrap();
}
fn frames_before_exact(
vmo: &mut FrameVmo,
time_nanos: i64,
duration: zx::MonotonicDuration,
frames: usize,
) {
let _ = vmo.stop();
let start_time = fasync::MonotonicInstant::from_nanos(time_nanos);
vmo.start(start_time).unwrap();
assert_eq!(frames, vmo.frames_before(start_time + duration));
}
#[fixture(with_test_vmo)]
#[fuchsia::test]
fn frames_before(mut vmo: FrameVmo) {
let _exec = fasync::TestExecutor::new();
let start_time = fasync::MonotonicInstant::now();
vmo.start(start_time).unwrap();
assert_eq!(0, vmo.frames_before(start_time));
assert_eq!(
1,
vmo.frames_before(start_time + zx::MonotonicDuration::from_nanos(ONE_FRAME_NANOS))
);
assert_eq!(
2,
vmo.frames_before(
start_time + zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS - 1)
)
);
assert_eq!(
3,
vmo.frames_before(start_time + zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS))
);
assert_eq!(
TEST_FPS as usize / 4,
vmo.frames_before(start_time + zx::MonotonicDuration::from_millis(250))
);
let three_quarters_dur = zx::MonotonicDuration::from_millis(375);
assert_eq!(3 * TEST_FPS as usize / 8, vmo.frames_before(start_time + three_quarters_dur));
assert_eq!(
10521,
vmo.frames_before(start_time + zx::MonotonicDuration::from_nanos(219188000))
);
frames_before_exact(
&mut vmo,
273533747037,
zx::MonotonicDuration::from_nanos(219188000),
10521,
);
frames_before_exact(
&mut vmo,
714329925362,
zx::MonotonicDuration::from_nanos(219292000),
10526,
);
}
#[fixture(with_test_vmo)]
#[fuchsia::test]
fn poll_read(mut vmo: FrameVmo) {
let exec = fasync::TestExecutor::new_with_fake_time();
exec.set_fake_time(fasync::MonotonicInstant::from_nanos(1_000_000_000));
let start_time = fasync::MonotonicInstant::now();
vmo.start(start_time).unwrap();
let half_dur = TEST_VMO_DURATION / 2;
const QUART_FRAMES: usize = TEST_FRAMES / 4;
let mut quart_frames_buf = [0; QUART_FRAMES];
exec.set_fake_time(fasync::MonotonicInstant::after(half_dur));
let mut no_wake_cx = Context::from_waker(futures_test::task::panic_waker_ref());
let res = vmo.poll_read(0, &mut quart_frames_buf, &mut no_wake_cx);
let (frame_idx, missed) = res.expect("frames should be ready").expect("no error");
assert_eq!(0, missed);
assert_eq!(frame_idx, QUART_FRAMES);
exec.set_fake_time(fasync::MonotonicInstant::after(half_dur));
let mut full_buf = [0; TEST_FRAMES];
let res = vmo.poll_read(0, &mut full_buf, &mut no_wake_cx);
let (frame_idx, missed) = res.expect("frames should be ready").expect("no error");
assert_eq!(0, missed);
assert_eq!(frame_idx, TEST_FRAMES);
exec.set_fake_time(fasync::MonotonicInstant::after(half_dur));
let res = vmo.poll_read(TEST_FRAMES / 2, &mut full_buf, &mut no_wake_cx);
let (frame_idx, missed) = res.expect("frames should be ready").expect("no error");
assert_eq!(0, missed);
assert_eq!(frame_idx, TEST_FRAMES + TEST_FRAMES / 2);
let res = vmo.poll_read(frame_idx - QUART_FRAMES, &mut quart_frames_buf, &mut no_wake_cx);
let (frame_idx, missed) = res.expect("frames should be ready").expect("no error");
assert_eq!(0, missed);
assert_eq!(frame_idx, TEST_FRAMES + TEST_FRAMES / 2);
}
#[fixture(with_test_vmo)]
#[fuchsia::test]
fn poll_read_waking(mut vmo: FrameVmo) {
let mut exec = fasync::TestExecutor::new_with_fake_time();
exec.set_fake_time(fasync::MonotonicInstant::from_nanos(1_000_000_000));
let half_dur = TEST_VMO_DURATION / 2;
const QUART_FRAMES: usize = TEST_FRAMES / 4;
let mut quart_frames_buf = [0; QUART_FRAMES];
exec.set_fake_time(fasync::MonotonicInstant::after(half_dur));
let (waker, count) = futures_test::task::new_count_waker();
let mut counting_wake_cx = Context::from_waker(&waker);
let res = vmo.poll_read(0, &mut quart_frames_buf[..], &mut counting_wake_cx);
res.expect_pending("should be pending before start");
let start_time = fasync::MonotonicInstant::now();
vmo.start(start_time).unwrap();
assert_eq!(count, 1);
let res = vmo.poll_read(0, &mut quart_frames_buf[..], &mut counting_wake_cx);
res.expect_pending("should be pending before start");
let new_time = fasync::MonotonicInstant::after(half_dur);
exec.set_fake_time(new_time);
assert!(exec.wake_expired_timers());
assert_eq!(count, 2);
let res = vmo.poll_read(0, &mut quart_frames_buf[..], &mut counting_wake_cx);
let (frame_idx, missed) = res.expect("frames should be ready").expect("no error");
assert_eq!(0, missed);
assert_eq!(frame_idx, QUART_FRAMES);
let mut half_frames_buf = [0; TEST_FRAMES / 2];
let res = vmo.poll_read(frame_idx, &mut half_frames_buf, &mut counting_wake_cx);
res.expect_pending("should be pending because not enough data");
assert_eq!(count, 2);
}
#[fuchsia::test]
fn multibyte_poll_read() {
let exec = fasync::TestExecutor::new_with_fake_time();
exec.set_fake_time(fasync::MonotonicInstant::from_nanos(1_000_000_000));
let mut vmo = FrameVmo::new().expect("can't make a framevmo");
let format = AudioSampleFormat::Sixteen { unsigned: false, invert_endian: false };
let frames = TEST_FPS as usize / 2;
assert!(vmo.bytecount_frames(10).is_none());
let _handle = vmo.set_format(TEST_FPS, format, 2, frames, 0).unwrap();
let half_dur = zx::MonotonicDuration::from_millis(250);
let start_time = fasync::MonotonicInstant::now() - half_dur;
vmo.start(start_time).unwrap();
let bytecount =
vmo.bytecount_frames(frames / 2).expect("a bytecount should exist after format");
let mut half_frames_buf = vec![0; bytecount];
let mut no_wake_cx = Context::from_waker(futures_test::task::panic_waker_ref());
let res = vmo.poll_read(0, half_frames_buf.as_mut_slice(), &mut no_wake_cx);
let (idx, missed) = res.expect("frames should be ready").expect("no error");
assert_eq!(0, missed);
assert_eq!(idx, frames / 2);
}
#[fixture(with_test_vmo)]
#[fuchsia::test]
fn poll_read_boundaries(mut vmo: FrameVmo) {
let exec = fasync::TestExecutor::new_with_fake_time();
exec.set_fake_time(fasync::MonotonicInstant::from_nanos(1_000_000_000));
let mut no_wake_cx = Context::from_waker(futures_test::task::panic_waker_ref());
let start_time = fasync::MonotonicInstant::now();
vmo.start(start_time).unwrap();
exec.set_fake_time(
start_time + zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS)
- zx::MonotonicDuration::from_nanos(1),
);
let mut one_frame_buf = [0; 1];
vmo.poll_read(2, &mut one_frame_buf, &mut no_wake_cx)
.expect_pending("third frame shouldn't be ready");
exec.set_fake_time(start_time + zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS));
let res = vmo.poll_read(2, &mut one_frame_buf, &mut no_wake_cx);
let (idx, missed) = res.expect("third frame should be ready").expect("no error");
assert_eq!(0, missed);
assert_eq!(3, idx);
let much_later_ns = 3999 * THREE_FRAME_NANOS;
let next_frame_idx = 3998 * 3 + 2;
exec.set_fake_time(start_time + zx::MonotonicDuration::from_nanos(much_later_ns));
let res = vmo.poll_read(next_frame_idx, &mut one_frame_buf, &mut no_wake_cx);
let (idx, missed) = res.expect("frame should be ready").expect("no error");
assert_eq!(0, missed);
assert_eq!(next_frame_idx + 1, idx);
let mut all_frames_len = 0;
let mut total_duration = zx::MonotonicDuration::from_nanos(0);
let moment_length = zx::MonotonicDuration::from_nanos(10_000);
let mut moment_start = start_time;
let mut moment_end = moment_start;
let mut next_index = 0;
let mut ten_frames_buf = [0; 10];
while total_duration < zx::MonotonicDuration::from_millis(250) {
moment_end += moment_length;
exec.set_fake_time(moment_end);
total_duration += moment_length;
let Poll::Ready(res) = vmo.poll_read(next_index, &mut ten_frames_buf, &mut no_wake_cx)
else {
continue;
};
let (last_idx, missed) = res.expect("no error");
assert_eq!(0, missed);
all_frames_len += ten_frames_buf.len();
assert_eq!(
all_frames_len,
vmo.frames_before(moment_end + zx::MonotonicDuration::from_nanos(1)),
"frame miscount after {:?} - {:?} moment",
moment_start,
moment_end
);
moment_start = moment_end;
next_index = last_idx;
}
assert_eq!(TEST_FRAMES / 2, all_frames_len, "should be a quarter second worth of frames");
}
#[fixture(with_test_vmo)]
#[fuchsia::test]
fn poll_write(mut vmo: FrameVmo) {
let exec = fasync::TestExecutor::new_with_fake_time();
exec.set_fake_time(fasync::MonotonicInstant::from_nanos(1_000_000_000));
let start_time = fasync::MonotonicInstant::now();
vmo.start(start_time).unwrap();
let half_dur = TEST_VMO_DURATION / 2;
const QUART_FRAMES: usize = TEST_FRAMES / 4;
let quart_frames_buf = [0; QUART_FRAMES];
let mut no_wake_cx = Context::from_waker(futures_test::task::panic_waker_ref());
let res = vmo.poll_write(0, &quart_frames_buf, &mut no_wake_cx);
let (frame_idx, missed) = res.expect("frames should be writable").expect("no error");
assert_eq!(0, missed);
assert_eq!(frame_idx, QUART_FRAMES);
exec.set_fake_time(fasync::MonotonicInstant::after(half_dur / 2));
let full_buf = [0; TEST_FRAMES];
let res = vmo.poll_write(frame_idx, &full_buf, &mut no_wake_cx);
let (frame_idx, missed) = res.expect("frames should be writable").expect("no error");
assert_eq!(0, missed);
assert_eq!(frame_idx, TEST_FRAMES + QUART_FRAMES);
exec.set_fake_time(fasync::MonotonicInstant::after(TEST_VMO_DURATION));
let res = vmo.poll_write(frame_idx, &full_buf, &mut no_wake_cx);
let (frame_idx, missed) = res.expect("frames should be ready").expect("no error");
assert_eq!(0, missed);
assert_eq!(frame_idx, TEST_FRAMES * 2 + QUART_FRAMES);
let res = vmo.poll_write(TEST_FRAMES + TEST_FRAMES, &quart_frames_buf, &mut no_wake_cx);
let (frame_idx, missed) = res.expect("frames should be writable").expect("no error");
assert_eq!(0, missed);
assert_eq!(frame_idx, TEST_FRAMES + TEST_FRAMES + QUART_FRAMES);
}
#[fixture(with_test_vmo)]
#[fuchsia::test]
fn poll_write_waking(mut vmo: FrameVmo) {
let mut exec = fasync::TestExecutor::new_with_fake_time();
exec.set_fake_time(fasync::MonotonicInstant::from_nanos(1_000_000_000));
let half_dur = TEST_VMO_DURATION / 2;
const QUART_FRAMES: usize = TEST_FRAMES / 4;
let quart_frames_buf = [0; QUART_FRAMES];
exec.set_fake_time(fasync::MonotonicInstant::after(half_dur));
let (waker, count) = futures_test::task::new_count_waker();
let mut counting_wake_cx = Context::from_waker(&waker);
let res = vmo.poll_write(0, &quart_frames_buf, &mut counting_wake_cx);
res.expect_pending("should be pending before start");
let start_time = fasync::MonotonicInstant::now();
vmo.start(start_time).unwrap();
assert_eq!(count, 1);
let mut idx = 0;
for _ in 0..4 {
let res = vmo.poll_write(idx, &quart_frames_buf, &mut counting_wake_cx);
let (new_idx, missed) = res.expect("should be writable just after start").expect("ok");
assert_eq!(0, missed);
assert!(new_idx > idx);
idx = new_idx;
}
let res = vmo.poll_write(idx, &quart_frames_buf, &mut counting_wake_cx);
res.expect_pending("should have to wait for time to go by");
let new_time = fasync::MonotonicInstant::after(half_dur);
exec.set_fake_time(new_time);
assert!(exec.wake_expired_timers());
assert_eq!(count, 2);
for _ in 0..2 {
let res = vmo.poll_write(idx, &quart_frames_buf, &mut counting_wake_cx);
let (new_idx, missed) = res.expect("frames writable").expect("ok");
assert_eq!(0, missed);
assert!(new_idx >= idx);
idx = new_idx;
}
let half_frames_buf = [0; TEST_FRAMES / 2];
let res = vmo.poll_write(idx - QUART_FRAMES, &half_frames_buf, &mut counting_wake_cx);
res.expect_pending("should be pending because not enough space");
assert_eq!(count, 2);
}
}