fuchsia_audio_device/
frame_vmo.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_fuchsia_hardware_audio::*;
6use fuchsia_async as fasync;
7use futures::task::{Context, Poll, Waker};
8use futures::FutureExt;
9use log::debug;
10use std::pin::Pin;
11use zx::{self as zx, HandleBased};
12
13use crate::stream_config::frames_from_duration;
14use crate::types::{AudioSampleFormat, Error, Result};
15
16/// A FrameVmo wraps a VMO with time tracking.  When a FrameVmo is started, it
17/// assumes that audio frame data is being written to the VMO at the rate specific
18/// in the format it is set to.  Frames that represent a time range can be
19/// retrieved from the buffer.
20pub(crate) struct FrameVmo {
21    /// Ring Buffer VMO. Size zero until the ringbuffer is established.  Shared with
22    /// the AudioFrameStream given back to the client.
23    vmo: zx::Vmo,
24
25    /// Cached size of the ringbuffer, in bytes.  Used to avoid zx_get_size() syscalls.
26    size: usize,
27
28    /// The time that streaming was started.
29    /// Used to calculate the currently available frames.
30    /// None if the stream is not started.
31    start_time: Option<fasync::MonotonicInstant>,
32
33    /// A waker to wake if we have been polled before enough frames are available, or
34    /// before we have been started.
35    waker: Option<Waker>,
36
37    /// A timer which will fire when there are enough frames to return min_duration frames.
38    timer: Option<Pin<Box<fasync::Timer>>>,
39
40    /// The number of frames per second.
41    frames_per_second: u32,
42
43    /// The audio format of the frames.
44    format: Option<AudioSampleFormat>,
45
46    /// Number of bytes per frame, 0 if format is not set.
47    bytes_per_frame: usize,
48
49    /// Frames between notifications. Zero if position notifications aren't enabled
50    frames_between_notifications: usize,
51
52    /// A position responder that will be used to notify when the ringbuffer has been read.
53    /// This will be notified the correct number of times based on the last read position using
54    /// `get_frames`
55    position_responder: Option<RingBufferWatchClockRecoveryPositionInfoResponder>,
56
57    /// The next frame index we are due to notify the position.
58    /// usize::MAX if position notifications are not enabled.
59    next_notify_frame: usize,
60}
61
62impl FrameVmo {
63    pub(crate) fn new() -> Result<FrameVmo> {
64        Ok(FrameVmo {
65            vmo: zx::Vmo::create(0).map_err(|e| Error::IOError(e))?,
66            size: 0,
67            start_time: None,
68            waker: None,
69            frames_per_second: 0,
70            format: None,
71            timer: None,
72            bytes_per_frame: 0,
73            frames_between_notifications: 0,
74            position_responder: None,
75            next_notify_frame: usize::MAX,
76        })
77    }
78
79    /// Set the format of this buffer.   Returns a handle representing the VMO.
80    /// `frames` is the number of frames the VMO should be able to hold.
81    pub(crate) fn set_format(
82        &mut self,
83        frames_per_second: u32,
84        format: AudioSampleFormat,
85        channels: u16,
86        frames: usize,
87        notifications_per_ring: u32,
88    ) -> Result<zx::Vmo> {
89        if self.start_time.is_some() {
90            return Err(Error::InvalidState);
91        }
92        let bytes_per_frame = format.compute_frame_size(channels as usize)?;
93        let new_size = bytes_per_frame * frames;
94        self.vmo = zx::Vmo::create(new_size as u64).map_err(|e| Error::IOError(e))?;
95        self.bytes_per_frame = bytes_per_frame;
96        self.size = new_size;
97        self.format = Some(format);
98        self.frames_per_second = frames_per_second;
99        if notifications_per_ring > 0 {
100            // TODO(https://fxbug.dev/42174677) : consider rounding this up to avoid delivering an extra
101            // notification sometimes.
102            // (and always align the frames notified to the beginning of the buffer)
103            self.frames_between_notifications = frames / notifications_per_ring as usize;
104        }
105        Ok(self.vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).map_err(|e| Error::IOError(e))?)
106    }
107
108    pub(crate) fn set_position_responder(
109        &mut self,
110        position_responder: RingBufferWatchClockRecoveryPositionInfoResponder,
111    ) {
112        self.position_responder = Some(position_responder);
113    }
114
115    /// Start the audio clock for the buffer at `time`
116    /// Can't start if the format has not been set.
117    pub(crate) fn start(&mut self, time: fasync::MonotonicInstant) -> Result<()> {
118        if self.start_time.is_some() || self.format.is_none() {
119            return Err(Error::InvalidState);
120        }
121        self.start_time = Some(time);
122        if self.frames_between_notifications > 0 {
123            self.next_notify_frame = 0;
124        }
125        if let Some(w) = self.waker.take() {
126            debug!("ringing the waker to start");
127            w.wake();
128        }
129        Ok(())
130    }
131
132    /// Stop the audio clock in the buffer.
133    /// returns true if the streaming was stopped.
134    pub(crate) fn stop(&mut self) -> Result<bool> {
135        if self.format.is_none() {
136            return Err(Error::InvalidState);
137        }
138        let start_time = self.start_time.take();
139        Ok(start_time.is_some())
140    }
141
142    /// Set the next-available-frame timer to fire `count` frames in the future from now.
143    /// Replaces the currently active timer.
144    /// Returns Poll::Pending if it was set to a time in the future, or Poll::Ready(()) if it would
145    /// be ready now.
146    fn reset_frame_timer(
147        &mut self,
148        deadline: fasync::MonotonicInstant,
149        cx: &mut Context<'_>,
150    ) -> Poll<()> {
151        let mut timer = Box::pin(fasync::Timer::new(deadline));
152        let Poll::Pending = timer.poll_unpin(cx) else {
153            return Poll::Ready(());
154        };
155        self.timer = Some(timer);
156        Poll::Pending
157    }
158
159    /// Calculate the number of bytes that `num` frames will occupy given the current set format.
160    /// Returns None if no format has been set.
161    pub(crate) fn bytecount_frames(&self, num: usize) -> Option<usize> {
162        (self.bytes_per_frame != 0).then_some(self.bytes_per_frame * num)
163    }
164
165    fn poll_start(&mut self, cx: &mut Context<'_>) -> Poll<fasync::MonotonicInstant> {
166        match self.start_time.as_ref() {
167            None => {
168                self.waker = Some(cx.waker().clone());
169                Poll::Pending
170            }
171            Some(time) => Poll::Ready(*time),
172        }
173    }
174
175    /// The number of frames total in the VMO.
176    /// Panics if the format has not been set.
177    fn frames(&self) -> usize {
178        self.size / self.bytes_per_frame
179    }
180
181    /// Index in the vmo of `frame_number`
182    /// Panicks if the format has not been set.
183    fn frame_idx(&self, frame_number: usize) -> usize {
184        frame_number % self.frames()
185    }
186
187    /// Returns a pair (low, high) of the set of frames within the "safe window" where the
188    /// client of the VMO is guaranteed to be able to read/write from, as of `time`.
189    /// The safe frame window advances forward through time.
190    /// TODO(https://fxbug.dev/42073419): currently this just returns the whole buffer
191    /// it should only return the moving window we've reserved for ourselves.
192    fn safe_frames_at(&self, time: fasync::MonotonicInstant) -> (usize, usize) {
193        let youngest_frame_count = self.frames_before(time);
194        let oldest_frame_count =
195            youngest_frame_count.saturating_sub(self.size / self.bytes_per_frame);
196        (oldest_frame_count, youngest_frame_count)
197    }
198
199    /// Notifies the position responder, if it's registered, that we have read or written past the
200    /// next notification point. `frame` is the youngest frame that was touched.
201    fn notify_position_responder(&mut self, frame: usize) {
202        if frame > self.next_notify_frame && self.position_responder.is_some() {
203            let start_time = self.start_time.unwrap();
204            let end_frame_time = start_time + self.duration_from_frames(frame);
205            let notify_frame_bytes = self.frame_idx(frame) * self.bytes_per_frame;
206            let info = RingBufferPositionInfo {
207                timestamp: end_frame_time.into_nanos(),
208                position: notify_frame_bytes as u32,
209            };
210            if let Some(Ok(())) = self.position_responder.take().map(|t| t.send(&info)) {
211                self.next_notify_frame += self.frames_between_notifications;
212            }
213        }
214    }
215
216    /// Gets the length of `buf` in frames. Returns Err(InvalidArgs) if the buffer is not an exact
217    /// number of frames.
218    fn len_in_frames(&self, buf: &[u8]) -> Result<usize> {
219        match buf.len().checked_div(self.bytes_per_frame) {
220            None => Err(Error::InvalidState),
221            Some(0) => Err(Error::InvalidArgs),
222            Some(_) if (buf.len() % self.bytes_per_frame) != 0 => Err(Error::InvalidArgs),
223            Some(len) => Ok(len),
224        }
225    }
226
227    /// Write audio frames starting at `next_frame`.
228    /// `next_frame` is a frame index, starting at zero when the buffer timer was started.
229    /// Reads from `buf` and writes into the VMO as whole frames.
230    /// Returns the index of the next frame to write (can be passed to `next_frame` on next poll),
231    /// and a count of frames that can never be written due to the frames being outside the write
232    /// window (too late writing).
233    /// Those frames provided in the `buf` are ignored, which can result in the entire buffer being
234    /// ignored.
235    /// If there is not enough space to write up to the end of the buffer, Poll::Pending is returned
236    /// and the waker associated with `cx` will be woken at a time when the write pointer has
237    /// advanced far enough to write the whole buffer.
238    /// If called before start, the waker associated with `cx` will be woken when started.
239    /// Returns Err(Error::InvalidArgs) if the buffer size is not a multiple of a positive number
240    /// of frames, or the buffer is larger than the possible available frames in the buffer.
241    /// See `bytecount_frames` to calculate buffer sizes for a count of frames.
242    ///
243    /// Calling poll_write with buffers of varying size is not expected, but should be supported.
244    /// Calling poll_write and poll_read on the same FrameVmo is not expected to work correctly.
245    pub(crate) fn poll_write(
246        &mut self,
247        mut next_frame: usize,
248        mut buf: &[u8],
249        cx: &mut Context<'_>,
250    ) -> Poll<Result<(usize, usize)>> {
251        let start_time = futures::ready!(self.poll_start(cx));
252        let count = self.len_in_frames(buf)?;
253        let total_vmo_frames = self.frames();
254        if count > total_vmo_frames || count == 0 {
255            return Poll::Ready(Err(Error::InvalidArgs));
256        }
257
258        let frame_until = next_frame + count;
259
260        let now = fasync::MonotonicInstant::now();
261        let (_oldest_frame_count, current_frame_count) = self.safe_frames_at(now);
262        // We write _ahead_ of the read pointer.
263        if (current_frame_count + total_vmo_frames) < frame_until {
264            let deadline = start_time + self.duration_from_frames(frame_until - total_vmo_frames);
265            let () = futures::ready!(self.reset_frame_timer(deadline, cx));
266            // Somehow raced to a time where they are available, try again.
267            return self.poll_write(next_frame, buf, cx);
268        }
269
270        let missing_frames = current_frame_count.saturating_sub(next_frame);
271
272        if missing_frames >= count {
273            // Missed all of them. boo underflows.
274            // The oldest frame of the buf is considered to be "already written".
275            self.notify_position_responder(frame_until);
276            return Poll::Ready(Ok((frame_until, count)));
277        } else if missing_frames > 0 {
278            next_frame = current_frame_count;
279            (_, buf) = buf.split_at(missing_frames * self.bytes_per_frame);
280            assert_eq!(frame_until - next_frame, self.len_in_frames(buf).unwrap());
281        }
282
283        let mut frame_from_idx = self.frame_idx(next_frame);
284        let frame_until_idx = self.frame_idx(frame_until);
285        let mut ndx = 0;
286
287        // If we wrap around, write to the end of the VMO, then set up to write the rest.
288        if frame_from_idx >= frame_until_idx {
289            let frames_to_write = total_vmo_frames - frame_from_idx;
290            let bytes_to_write = frames_to_write * self.bytes_per_frame;
291            let byte_start = frame_from_idx * self.bytes_per_frame;
292            self.vmo
293                .write(&buf[0..bytes_to_write], byte_start as u64)
294                .map_err(|e| Error::IOError(e))?;
295            frame_from_idx = 0;
296            ndx = bytes_to_write;
297        }
298
299        let byte_start = frame_from_idx * self.bytes_per_frame;
300
301        self.vmo.write(&buf[ndx..], byte_start as u64).map_err(|e| Error::IOError(e))?;
302
303        // We're writing frames from just after the `next_frame` up to `frame_until`
304        // Notify if we have a position responder on the first write past the clock notification time.
305        self.notify_position_responder(frame_until);
306        Poll::Ready(Ok((frame_until, missing_frames)))
307    }
308
309    /// Retrieve audio frames available starting at `next_frame`.
310    /// `next_frame` is a frame index, starting at zero when the buffer timer was started.
311    /// Fills `buf` with as many frames as will fill the slice.
312    /// Returns the index of the next frame to read (can be passed to `next_trame` on next poll), and a
313    /// count of frames that were missed due to buffer overwrite (slow polling).
314    /// If the buffer cannot be filled, Poll::Pending is returned and the waker
315    /// associated with `cx` will be woken at a time when enough data is expected to be
316    /// available.
317    /// If polled before start, the waker associate with `cx` will be woken when started.
318    /// Returns Err(Error::InvalidArgs) if the buffer size is not a multiple of a positive number
319    /// of frames, or the buffer is larger than the possible available frames in the buffer.
320    /// See `bytecount_frames` to calculate buffer sizes for a count of frames.
321    ///
322    /// Calling poll_read with buffers of varying size is not expected, but should be supported.
323    pub(crate) fn poll_read(
324        &mut self,
325        mut next_frame: usize,
326        buf: &mut [u8],
327        cx: &mut Context<'_>,
328    ) -> Poll<Result<(usize, usize)>> {
329        let _start_time = futures::ready!(self.poll_start(cx));
330        let count = self.len_in_frames(buf)?;
331        let total_vmo_frames = self.frames();
332        if count > total_vmo_frames || count == 0 {
333            return Poll::Ready(Err(Error::InvalidArgs));
334        }
335
336        let now = fasync::MonotonicInstant::now();
337        let (oldest_frame_count, current_frame_count) = self.safe_frames_at(now);
338
339        let missing_frames = oldest_frame_count.saturating_sub(next_frame);
340
341        if missing_frames > 0 {
342            next_frame = oldest_frame_count;
343        }
344
345        let frames_available = current_frame_count.saturating_sub(next_frame);
346        if frames_available < count {
347            // Set the timer to wake when enough frames will be available
348            let deadline = now + self.duration_from_frames(count - frames_available);
349            let () = futures::ready!(self.reset_frame_timer(deadline, cx));
350            // Somehow raced to a time where they are available, try again.
351            return self.poll_read(next_frame, buf, cx);
352        }
353
354        let mut frame_from_idx = self.frame_idx(next_frame);
355        let frame_until = next_frame + count;
356        let frame_until_idx = self.frame_idx(frame_until);
357        let mut ndx = 0;
358
359        // If we wrap around, read to the end into the buffer, then set up to read the rest.
360        if frame_from_idx >= frame_until_idx {
361            let frames_to_read = total_vmo_frames - frame_from_idx;
362            let bytes_to_read = frames_to_read * self.bytes_per_frame;
363            let byte_start = frame_from_idx * self.bytes_per_frame;
364            self.vmo
365                .read(&mut buf[0..bytes_to_read], byte_start as u64)
366                .map_err(|e| Error::IOError(e))?;
367            frame_from_idx = 0;
368            ndx = bytes_to_read;
369        }
370
371        let byte_start = frame_from_idx * self.bytes_per_frame;
372
373        self.vmo.read(&mut buf[ndx..], byte_start as u64).map_err(|e| Error::IOError(e))?;
374
375        // We're returning frames from just after the `next_frame` up to `frame_until`
376        // Notify if we have a position responder and we read past the clock notification time.
377        self.notify_position_responder(frame_until);
378        Poll::Ready(Ok((frame_until, missing_frames)))
379    }
380
381    /// Count of the number of frames that have ended before `time`.
382    fn frames_before(&self, time: fasync::MonotonicInstant) -> usize {
383        match self.start_time.as_ref() {
384            Some(start) if time > *start => self.frames_from_duration(time - *start) as usize,
385            _ => 0,
386        }
387    }
388
389    /// Frames from duration, based on the current frames per second of the ringbuffer.
390    fn frames_from_duration(&self, duration: fasync::MonotonicDuration) -> usize {
391        frames_from_duration(self.frames_per_second as usize, duration)
392    }
393
394    /// Return an amount of time that guarantees that `frames` frames has passed.
395    /// This means that partial nanoseconds will be rounded up, so that
396    /// [time, time + duration_from_frames(n)] is guaranteed to include n audio frames.
397    /// Only defined for positive numbers of frames.
398    fn duration_from_frames(&self, frames: usize) -> fasync::MonotonicDuration {
399        let fps = self.frames_per_second as i64;
400        let secs = frames as i64 / fps;
401        let leftover_frames = frames as i64 % fps;
402        let nanos = (leftover_frames * 1_000_000_000i64) / fps;
403        let roundup_nano = if 0 != ((leftover_frames * 1_000_000_000i64) % fps) { 1 } else { 0 };
404        zx::MonotonicDuration::from_seconds(secs)
405            + zx::MonotonicDuration::from_nanos(nanos + roundup_nano)
406    }
407}
408
409#[cfg(test)]
410mod tests {
411    use super::*;
412
413    use async_utils::PollExt;
414    use fixture::fixture;
415
416    // Convenience choice because one byte = one frame.
417    const TEST_FORMAT: AudioSampleFormat = AudioSampleFormat::Eight { unsigned: false };
418    const TEST_CHANNELS: u16 = 1;
419    const TEST_FPS: u32 = 48000;
420    const TEST_FRAMES: usize = TEST_FPS as usize / 2;
421    // Duration of the whole VMO.
422    const TEST_VMO_DURATION: fasync::MonotonicDuration =
423        fasync::MonotonicDuration::from_millis(500);
424
425    // At 48kHz, each frame is 20833 and 1/3 nanoseconds. We add one nanosecond
426    // so that the one and two frames are completely within the time.
427    const ONE_FRAME_NANOS: i64 = 20833 + 1;
428    const TWO_FRAME_NANOS: i64 = 20833 * 2 + 1;
429    const THREE_FRAME_NANOS: i64 = 20833 * 3 + 1;
430
431    fn get_test_vmo(frames: usize) -> FrameVmo {
432        let mut vmo = FrameVmo::new().expect("can't make a framevmo");
433        let _handle = vmo.set_format(TEST_FPS, TEST_FORMAT, TEST_CHANNELS, frames, 0).unwrap();
434        vmo
435    }
436
437    fn with_test_vmo<F>(_name: &str, test: F)
438    where
439        F: FnOnce(FrameVmo) -> (),
440    {
441        test(get_test_vmo(TEST_FRAMES))
442    }
443
444    #[fixture(with_test_vmo)]
445    #[fuchsia::test]
446    fn duration_from_frames(vmo: FrameVmo) {
447        assert_eq!(zx::MonotonicDuration::from_nanos(ONE_FRAME_NANOS), vmo.duration_from_frames(1));
448        assert_eq!(zx::MonotonicDuration::from_nanos(TWO_FRAME_NANOS), vmo.duration_from_frames(2));
449        assert_eq!(
450            zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS),
451            vmo.duration_from_frames(3)
452        );
453
454        assert_eq!(
455            zx::MonotonicDuration::from_seconds(1),
456            vmo.duration_from_frames(TEST_FPS as usize)
457        );
458
459        assert_eq!(zx::MonotonicDuration::from_millis(1500), vmo.duration_from_frames(72000));
460    }
461
462    #[fixture(with_test_vmo)]
463    #[fuchsia::test]
464    fn frames_from_duration(vmo: FrameVmo) {
465        assert_eq!(0, vmo.frames_from_duration(zx::MonotonicDuration::from_nanos(0)));
466
467        assert_eq!(
468            0,
469            vmo.frames_from_duration(zx::MonotonicDuration::from_nanos(ONE_FRAME_NANOS - 1))
470        );
471        assert_eq!(1, vmo.frames_from_duration(zx::MonotonicDuration::from_nanos(ONE_FRAME_NANOS)));
472
473        // Three frames is an exact number of nanoseconds, testing the edge.
474        assert_eq!(
475            2,
476            vmo.frames_from_duration(zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS - 1))
477        );
478        assert_eq!(
479            3,
480            vmo.frames_from_duration(zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS))
481        );
482        assert_eq!(
483            3,
484            vmo.frames_from_duration(zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS + 1))
485        );
486
487        assert_eq!(
488            TEST_FPS as usize,
489            vmo.frames_from_duration(zx::MonotonicDuration::from_seconds(1))
490        );
491        assert_eq!(72000, vmo.frames_from_duration(zx::MonotonicDuration::from_millis(1500)));
492
493        assert_eq!(10660, vmo.frames_from_duration(zx::MonotonicDuration::from_nanos(222084000)));
494    }
495
496    #[fuchsia::test]
497    fn start_stop() {
498        let _exec = fasync::TestExecutor::new();
499
500        let mut vmo = FrameVmo::new().expect("can't make a framevmo");
501
502        // Starting before set_format is an error.
503        assert!(vmo.start(fasync::MonotonicInstant::now()).is_err());
504        // Stopping before set_format is an error.
505        assert!(vmo.stop().is_err());
506
507        let _handle = vmo.set_format(TEST_FPS, TEST_FORMAT, TEST_CHANNELS, TEST_FRAMES, 0).unwrap();
508
509        let start_time = fasync::MonotonicInstant::now();
510        vmo.start(start_time).unwrap();
511        match vmo.start(start_time) {
512            Err(Error::InvalidState) => {}
513            x => panic!("Expected Err(InvalidState) from double start but got {:?}", x),
514        };
515
516        assert_eq!(true, vmo.stop().unwrap());
517        // stop is idempotent, but will return false if it was already stopped
518        assert_eq!(false, vmo.stop().unwrap());
519
520        vmo.start(start_time).unwrap();
521    }
522
523    fn frames_before_exact(
524        vmo: &mut FrameVmo,
525        time_nanos: i64,
526        duration: zx::MonotonicDuration,
527        frames: usize,
528    ) {
529        let _ = vmo.stop();
530        let start_time = fasync::MonotonicInstant::from_nanos(time_nanos);
531        vmo.start(start_time).unwrap();
532        assert_eq!(frames, vmo.frames_before(start_time + duration));
533    }
534
535    #[fixture(with_test_vmo)]
536    #[fuchsia::test]
537    fn frames_before(mut vmo: FrameVmo) {
538        let _exec = fasync::TestExecutor::new();
539
540        let start_time = fasync::MonotonicInstant::now();
541        vmo.start(start_time).unwrap();
542
543        assert_eq!(0, vmo.frames_before(start_time));
544
545        assert_eq!(
546            1,
547            vmo.frames_before(start_time + zx::MonotonicDuration::from_nanos(ONE_FRAME_NANOS))
548        );
549        assert_eq!(
550            2,
551            vmo.frames_before(
552                start_time + zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS - 1)
553            )
554        );
555        assert_eq!(
556            3,
557            vmo.frames_before(start_time + zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS))
558        );
559
560        assert_eq!(
561            TEST_FPS as usize / 4,
562            vmo.frames_before(start_time + zx::MonotonicDuration::from_millis(250))
563        );
564
565        let three_quarters_dur = zx::MonotonicDuration::from_millis(375);
566        assert_eq!(3 * TEST_FPS as usize / 8, vmo.frames_before(start_time + three_quarters_dur));
567
568        assert_eq!(
569            10521,
570            vmo.frames_before(start_time + zx::MonotonicDuration::from_nanos(219188000))
571        );
572
573        frames_before_exact(
574            &mut vmo,
575            273533747037,
576            zx::MonotonicDuration::from_nanos(219188000),
577            10521,
578        );
579        frames_before_exact(
580            &mut vmo,
581            714329925362,
582            zx::MonotonicDuration::from_nanos(219292000),
583            10526,
584        );
585    }
586
587    #[fixture(with_test_vmo)]
588    #[fuchsia::test]
589    fn poll_read(mut vmo: FrameVmo) {
590        let exec = fasync::TestExecutor::new_with_fake_time();
591        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(1_000_000_000));
592
593        let start_time = fasync::MonotonicInstant::now();
594        vmo.start(start_time).unwrap();
595
596        let half_dur = TEST_VMO_DURATION / 2;
597        const QUART_FRAMES: usize = TEST_FRAMES / 4;
598        let mut quart_frames_buf = [0; QUART_FRAMES];
599        exec.set_fake_time(fasync::MonotonicInstant::after(half_dur));
600
601        let mut no_wake_cx = Context::from_waker(futures_test::task::panic_waker_ref());
602
603        let res = vmo.poll_read(0, &mut quart_frames_buf, &mut no_wake_cx);
604        let (frame_idx, missed) = res.expect("frames should be ready").expect("no error");
605
606        assert_eq!(0, missed);
607        // index returned should be equal to the number of frames returned minus 1 - zero is the
608        // first frame
609        assert_eq!(frame_idx, QUART_FRAMES);
610
611        // After another half_dur, we should be able to read the whole buffer from start to finish.
612        exec.set_fake_time(fasync::MonotonicInstant::after(half_dur));
613
614        let mut full_buf = [0; TEST_FRAMES];
615        let res = vmo.poll_read(0, &mut full_buf, &mut no_wake_cx);
616        let (frame_idx, missed) = res.expect("frames should be ready").expect("no error");
617
618        assert_eq!(0, missed);
619        // index returned should be equal to the number of frames returned minus 1 (zero indexing)
620        assert_eq!(frame_idx, TEST_FRAMES);
621
622        // Each `half_dur` period should pseudo-fill half the vmo.
623        // After three halves, we should have the oldest frame half-way through the buffer (at
624        // index 12000)
625        exec.set_fake_time(fasync::MonotonicInstant::after(half_dur));
626
627        // Should be able to get frames that span from the middle of the buffer to the middle of the
628        // buffer (from index 12000 to index 11999).  This should be 24000 frames.
629        // TODO(https://fxbug.dev/42171752): should mark the buffer somehow to confirm that the data is correct
630        let res = vmo.poll_read(TEST_FRAMES / 2, &mut full_buf, &mut no_wake_cx);
631        let (frame_idx, missed) = res.expect("frames should be ready").expect("no error");
632
633        assert_eq!(0, missed);
634        assert_eq!(frame_idx, TEST_FRAMES + TEST_FRAMES / 2);
635
636        // Should be able to get a set of frames that is all located before the oldest point in the
637        // VMO, which is currently in the middle of the VMO.
638        // This should be from about a quarter in to halfway in (now)
639        // Should be able to get exactly the min_duration amount of frames.
640        let res = vmo.poll_read(frame_idx - QUART_FRAMES, &mut quart_frames_buf, &mut no_wake_cx);
641        let (frame_idx, missed) = res.expect("frames should be ready").expect("no error");
642
643        assert_eq!(0, missed);
644        assert_eq!(frame_idx, TEST_FRAMES + TEST_FRAMES / 2);
645    }
646
647    #[fixture(with_test_vmo)]
648    #[fuchsia::test]
649    fn poll_read_waking(mut vmo: FrameVmo) {
650        let mut exec = fasync::TestExecutor::new_with_fake_time();
651        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(1_000_000_000));
652
653        let half_dur = TEST_VMO_DURATION / 2;
654        const QUART_FRAMES: usize = TEST_FRAMES / 4;
655        let mut quart_frames_buf = [0; QUART_FRAMES];
656        exec.set_fake_time(fasync::MonotonicInstant::after(half_dur));
657
658        let (waker, count) = futures_test::task::new_count_waker();
659        let mut counting_wake_cx = Context::from_waker(&waker);
660
661        // Polled before start, should wake the waker when started.
662        let res = vmo.poll_read(0, &mut quart_frames_buf[..], &mut counting_wake_cx);
663        res.expect_pending("should be pending before start");
664
665        let start_time = fasync::MonotonicInstant::now();
666        vmo.start(start_time).unwrap();
667
668        // Woken when we start.
669        assert_eq!(count, 1);
670
671        // Polling before frames are ready should return pending, and set a timer.
672        let res = vmo.poll_read(0, &mut quart_frames_buf[..], &mut counting_wake_cx);
673        res.expect_pending("should be pending before start");
674
675        // Each `half_dur` period should pseudo-fill half the vmo.
676        // After a half_dur, we should have been woken.
677        let new_time = fasync::MonotonicInstant::after(half_dur);
678        exec.set_fake_time(new_time);
679        assert!(exec.wake_expired_timers());
680
681        assert_eq!(count, 2);
682
683        // Should be ready now, with half the duration in frames available.
684        let res = vmo.poll_read(0, &mut quart_frames_buf[..], &mut counting_wake_cx);
685        let (frame_idx, missed) = res.expect("frames should be ready").expect("no error");
686        assert_eq!(0, missed);
687        assert_eq!(frame_idx, QUART_FRAMES);
688
689        // Polling again with more frames than we have should return pending because there
690        // isn't enough data available.
691        let mut half_frames_buf = [0; TEST_FRAMES / 2];
692        let res = vmo.poll_read(frame_idx, &mut half_frames_buf, &mut counting_wake_cx);
693        res.expect_pending("should be pending because not enough data");
694
695        assert_eq!(count, 2);
696    }
697
698    #[fuchsia::test]
699    fn multibyte_poll_read() {
700        let exec = fasync::TestExecutor::new_with_fake_time();
701        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(1_000_000_000));
702        let mut vmo = FrameVmo::new().expect("can't make a framevmo");
703        let format = AudioSampleFormat::Sixteen { unsigned: false, invert_endian: false };
704        let frames = TEST_FPS as usize / 2;
705
706        // No byte count can be determined before the format is set.
707        assert!(vmo.bytecount_frames(10).is_none());
708
709        let _handle = vmo.set_format(TEST_FPS, format, 2, frames, 0).unwrap();
710
711        let half_dur = zx::MonotonicDuration::from_millis(250);
712
713        // Start in the past so we can be sure the frames are in the past.
714        let start_time = fasync::MonotonicInstant::now() - half_dur;
715        vmo.start(start_time).unwrap();
716
717        let bytecount =
718            vmo.bytecount_frames(frames / 2).expect("a bytecount should exist after format");
719        let mut half_frames_buf = vec![0; bytecount];
720
721        let mut no_wake_cx = Context::from_waker(futures_test::task::panic_waker_ref());
722        let res = vmo.poll_read(0, half_frames_buf.as_mut_slice(), &mut no_wake_cx);
723        let (idx, missed) = res.expect("frames should be ready").expect("no error");
724
725        assert_eq!(0, missed);
726        // Still frames are in frame counts, not byte counts.
727        assert_eq!(idx, frames / 2);
728    }
729
730    #[fixture(with_test_vmo)]
731    #[fuchsia::test]
732    fn poll_read_boundaries(mut vmo: FrameVmo) {
733        let exec = fasync::TestExecutor::new_with_fake_time();
734        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(1_000_000_000));
735        let mut no_wake_cx = Context::from_waker(futures_test::task::panic_waker_ref());
736
737        let start_time = fasync::MonotonicInstant::now();
738
739        vmo.start(start_time).unwrap();
740
741        // Just before the frame finishes, we shouldn't be able to get it.
742        exec.set_fake_time(
743            start_time + zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS)
744                - zx::MonotonicDuration::from_nanos(1),
745        );
746        let mut one_frame_buf = [0; 1];
747        vmo.poll_read(2, &mut one_frame_buf, &mut no_wake_cx)
748            .expect_pending("third frame shouldn't be ready");
749        // Exactly when the frame finishes, should be able to get the frame.
750        exec.set_fake_time(start_time + zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS));
751        let res = vmo.poll_read(2, &mut one_frame_buf, &mut no_wake_cx);
752        let (idx, missed) = res.expect("third frame should be ready").expect("no error");
753
754        assert_eq!(0, missed);
755        // index is the index of the next frame to write, which should be the frame index we requested + 1
756        assert_eq!(3, idx);
757
758        // a bunch of time has passed, let's get one frame again.
759        let much_later_ns = 3999 * THREE_FRAME_NANOS;
760        let next_frame_idx = 3998 * 3 + 2;
761        exec.set_fake_time(start_time + zx::MonotonicDuration::from_nanos(much_later_ns));
762        let res = vmo.poll_read(next_frame_idx, &mut one_frame_buf, &mut no_wake_cx);
763        let (idx, missed) = res.expect("frame should be ready").expect("no error");
764        assert_eq!(0, missed);
765        // index is the index of the next frame
766        assert_eq!(next_frame_idx + 1, idx);
767
768        let mut all_frames_len = 0;
769        let mut total_duration = zx::MonotonicDuration::from_nanos(0);
770
771        let moment_length = zx::MonotonicDuration::from_nanos(10_000);
772
773        let mut moment_start = start_time;
774        let mut moment_end = moment_start;
775        let mut next_index = 0;
776
777        let mut ten_frames_buf = [0; 10];
778
779        while total_duration < zx::MonotonicDuration::from_millis(250) {
780            moment_end += moment_length;
781            exec.set_fake_time(moment_end);
782            total_duration += moment_length;
783            // the cx here will never be woken since we never wake timers
784            let Poll::Ready(res) = vmo.poll_read(next_index, &mut ten_frames_buf, &mut no_wake_cx)
785            else {
786                continue;
787            };
788            let (last_idx, missed) = res.expect("no error");
789            assert_eq!(0, missed);
790            all_frames_len += ten_frames_buf.len();
791            assert_eq!(
792                all_frames_len,
793                vmo.frames_before(moment_end + zx::MonotonicDuration::from_nanos(1)),
794                "frame miscount after {:?} - {:?} moment",
795                moment_start,
796                moment_end
797            );
798            moment_start = moment_end;
799            next_index = last_idx;
800        }
801
802        assert_eq!(TEST_FRAMES / 2, all_frames_len, "should be a quarter second worth of frames");
803    }
804
805    #[fixture(with_test_vmo)]
806    #[fuchsia::test]
807    fn poll_write(mut vmo: FrameVmo) {
808        let exec = fasync::TestExecutor::new_with_fake_time();
809        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(1_000_000_000));
810
811        let start_time = fasync::MonotonicInstant::now();
812        vmo.start(start_time).unwrap();
813
814        let half_dur = TEST_VMO_DURATION / 2;
815        const QUART_FRAMES: usize = TEST_FRAMES / 4;
816        let quart_frames_buf = [0; QUART_FRAMES];
817
818        let mut no_wake_cx = Context::from_waker(futures_test::task::panic_waker_ref());
819
820        // Immediately after starting we should be able to write.
821        let res = vmo.poll_write(0, &quart_frames_buf, &mut no_wake_cx);
822        let (frame_idx, missed) = res.expect("frames should be writable").expect("no error");
823
824        assert_eq!(0, missed);
825        // index returned should be equal to the number of frames written - zero indexed
826        // frame numbers.
827        assert_eq!(frame_idx, QUART_FRAMES);
828
829        // After a quarter_duration, we should be able to write to the whole buffer
830        // TODO(https://fxbug.dev/42073419): this should only be the safe space and not the whole
831        // buffer, there is a no-go-space just past the read pointer.
832        exec.set_fake_time(fasync::MonotonicInstant::after(half_dur / 2));
833
834        let full_buf = [0; TEST_FRAMES];
835        let res = vmo.poll_write(frame_idx, &full_buf, &mut no_wake_cx);
836        let (frame_idx, missed) = res.expect("frames should be writable").expect("no error");
837
838        assert_eq!(0, missed);
839        // index returned should be equal to the number of frames returned minus 1 (zero indexing)
840        assert_eq!(frame_idx, TEST_FRAMES + QUART_FRAMES);
841
842        // Each `half_dur` period should "read past" half the vmo.
843        // After five quarters duration, the read pointer should be at FRAMES + QUART_FRAMES.
844        // (vmo index 12000)
845        exec.set_fake_time(fasync::MonotonicInstant::after(TEST_VMO_DURATION));
846
847        // Should be able to write frames frames that span from the quarter of the buffer to the
848        // read pointer again (from index 6000 to index 5999).  This should be 24000 frames.
849        // TODO(https://fxbug.dev/42171752): should mark the buffer somehow to confirm that the data is correct
850        let res = vmo.poll_write(frame_idx, &full_buf, &mut no_wake_cx);
851        let (frame_idx, missed) = res.expect("frames should be ready").expect("no error");
852
853        assert_eq!(0, missed);
854        assert_eq!(frame_idx, TEST_FRAMES * 2 + QUART_FRAMES);
855
856        // Should be able to write set of frames that is all located before the youngest point in the
857        // VMO, which is currently a quarter way through the vmo.
858        // This is from zero to a quarter in.
859        let res = vmo.poll_write(TEST_FRAMES + TEST_FRAMES, &quart_frames_buf, &mut no_wake_cx);
860        let (frame_idx, missed) = res.expect("frames should be writable").expect("no error");
861
862        assert_eq!(0, missed);
863        assert_eq!(frame_idx, TEST_FRAMES + TEST_FRAMES + QUART_FRAMES);
864    }
865
866    #[fixture(with_test_vmo)]
867    #[fuchsia::test]
868    fn poll_write_waking(mut vmo: FrameVmo) {
869        let mut exec = fasync::TestExecutor::new_with_fake_time();
870        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(1_000_000_000));
871
872        let half_dur = TEST_VMO_DURATION / 2;
873        const QUART_FRAMES: usize = TEST_FRAMES / 4;
874        let quart_frames_buf = [0; QUART_FRAMES];
875        exec.set_fake_time(fasync::MonotonicInstant::after(half_dur));
876
877        let (waker, count) = futures_test::task::new_count_waker();
878        let mut counting_wake_cx = Context::from_waker(&waker);
879
880        // Polled before start, should wake the waker when started.
881        let res = vmo.poll_write(0, &quart_frames_buf, &mut counting_wake_cx);
882        res.expect_pending("should be pending before start");
883
884        let start_time = fasync::MonotonicInstant::now();
885        vmo.start(start_time).unwrap();
886
887        // Woken when we start.
888        assert_eq!(count, 1);
889
890        // Should be able to write a whole buffer right after start.
891        // TODO(https://fxbug.dev/42073419): this should only be the safe space and not the whole
892        // buffer, there is a no-go-space just past the read pointer.
893        let mut idx = 0;
894        for _ in 0..4 {
895            let res = vmo.poll_write(idx, &quart_frames_buf, &mut counting_wake_cx);
896            let (new_idx, missed) = res.expect("should be writable just after start").expect("ok");
897            assert_eq!(0, missed);
898            assert!(new_idx > idx);
899            idx = new_idx;
900        }
901
902        // After a full buffer write, we should be pending
903        let res = vmo.poll_write(idx, &quart_frames_buf, &mut counting_wake_cx);
904        res.expect_pending("should have to wait for time to go by");
905
906        // Each `half_dur` period should pseudo-read half the vmo.
907        // After a half_dur, we should have been woken.
908        let new_time = fasync::MonotonicInstant::after(half_dur);
909        exec.set_fake_time(new_time);
910        assert!(exec.wake_expired_timers());
911
912        assert_eq!(count, 2);
913
914        // Should be ready now, and be able to write twice.
915        for _ in 0..2 {
916            let res = vmo.poll_write(idx, &quart_frames_buf, &mut counting_wake_cx);
917            let (new_idx, missed) = res.expect("frames writable").expect("ok");
918            assert_eq!(0, missed);
919            assert!(new_idx >= idx);
920            idx = new_idx;
921        }
922
923        // Polling again with more frames should return pending because there
924        // isn't enough data available, even if we start in the "past"
925        let half_frames_buf = [0; TEST_FRAMES / 2];
926        let res = vmo.poll_write(idx - QUART_FRAMES, &half_frames_buf, &mut counting_wake_cx);
927        res.expect_pending("should be pending because not enough space");
928
929        assert_eq!(count, 2);
930    }
931}