fuchsia_audio_device/
audio_frame_stream.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_inspect as inspect;
6use fuchsia_inspect_derive::{AttachError, Inspect};
7use fuchsia_sync::Mutex;
8use futures::stream::FusedStream;
9use futures::task::{Context, Poll};
10use futures::{FutureExt, Stream};
11use log::info;
12use std::pin::Pin;
13use std::sync::Arc;
14
15use crate::frame_vmo;
16use crate::stream_config::{SoftStreamConfig, StreamConfigOrTask};
17use crate::types::{Error, Result};
18
19/// A stream that produces audio frames.
20/// Frames are of constant length.
21/// Usually acquired via SoftStreamConfig::create_output()
22pub struct AudioFrameStream {
23    /// Handle to the VMO that is receiving the frames.
24    frame_vmo: Arc<Mutex<frame_vmo::FrameVmo>>,
25    /// The next frame number we should retrieve.
26    next_frame: usize,
27    /// Number of frames to return in a packet.
28    packet_frames: usize,
29    /// Vector that will be filled with a packet.
30    /// Replaced when stream produces a packet.
31    next_packet: std::cell::RefCell<Vec<u8>>,
32    /// SoftStreamConfig this is attached to, or the SoftStreamConfig::process_requests task
33    stream_task: StreamConfigOrTask,
34    /// Inspect node
35    inspect: inspect::Node,
36}
37
38impl AudioFrameStream {
39    pub fn new(stream: SoftStreamConfig) -> AudioFrameStream {
40        AudioFrameStream {
41            frame_vmo: stream.frame_vmo(),
42            next_frame: 0,
43            packet_frames: stream.packet_frames(),
44            next_packet: Vec::new().into(),
45            stream_task: StreamConfigOrTask::StreamConfig(stream),
46            inspect: Default::default(),
47        }
48    }
49
50    /// Start the requests task if not started, and poll the task.
51    fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
52        if let StreamConfigOrTask::Complete = &self.stream_task {
53            return Poll::Ready(Err(Error::InvalidState));
54        }
55        if let StreamConfigOrTask::Task(ref mut task) = &mut self.stream_task {
56            return task.poll_unpin(cx);
57        }
58        self.stream_task.start();
59        self.poll_task(cx)
60    }
61}
62
63impl Stream for AudioFrameStream {
64    type Item = Result<Vec<u8>>;
65
66    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67        if let Poll::Ready(r) = self.poll_task(cx) {
68            self.stream_task = StreamConfigOrTask::Complete;
69            return Poll::Ready(r.err().map(Result::Err));
70        }
71        if self.next_packet.borrow().len() == 0 {
72            if let Some(new_len) = self.frame_vmo.lock().bytecount_frames(self.packet_frames) {
73                self.next_packet.borrow_mut().resize(new_len, 0);
74            }
75        }
76        let result = {
77            let mut lock = self.frame_vmo.lock();
78            futures::ready!(lock.poll_read(
79                self.next_frame,
80                self.next_packet.borrow_mut().as_mut_slice(),
81                cx
82            ))
83        };
84
85        match result {
86            Ok((next_frame, missed)) => {
87                if missed > 0 {
88                    info!("Missed {missed} frames due to slow polling");
89                }
90                self.next_frame = next_frame;
91                let vec_mut = self.next_packet.get_mut();
92                let bytes = vec_mut.len();
93                let frames = std::mem::replace(vec_mut, vec![0; bytes]);
94                Poll::Ready(Some(Ok(frames)))
95            }
96            Err(e) => Poll::Ready(Some(Err(e))),
97        }
98    }
99}
100
101impl FusedStream for AudioFrameStream {
102    fn is_terminated(&self) -> bool {
103        match self.stream_task {
104            StreamConfigOrTask::Complete => true,
105            _ => false,
106        }
107    }
108}
109
110impl Inspect for &mut AudioFrameStream {
111    fn iattach(
112        self,
113        parent: &fuchsia_inspect::Node,
114        name: impl AsRef<str>,
115    ) -> core::result::Result<(), AttachError> {
116        self.inspect = parent.create_child(name.as_ref());
117        if let StreamConfigOrTask::StreamConfig(ref mut o) = &mut self.stream_task {
118            return o.iattach(&self.inspect, "stream_config");
119        }
120        Ok(())
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127
128    use async_utils::PollExt;
129    use fidl_fuchsia_hardware_audio::*;
130    use fixture::fixture;
131    use fuchsia_async as fasync;
132    use futures::StreamExt;
133
134    use crate::stream_config::tests::with_audio_frame_stream;
135
136    const TEST_UNIQUE_ID: &[u8; 16] = &[5; 16];
137    const TEST_CLOCK_DOMAIN: u32 = 0x00010203;
138
139    #[fixture(with_audio_frame_stream)]
140    #[fuchsia::test]
141    #[rustfmt::skip]
142    fn soft_audio_out(mut exec: fasync::TestExecutor, stream_config: StreamConfigProxy, mut frame_stream: AudioFrameStream) {
143        let mut frame_fut = frame_stream.next();
144        // Poll the frame stream, which should start the processing of proxy requests.
145        exec.run_until_stalled(&mut frame_fut).expect_pending("no frames yet");
146
147        let result = exec.run_until_stalled(&mut stream_config.get_properties());
148        assert!(result.is_ready());
149        let props1 = match result {
150            Poll::Ready(Ok(v)) => v,
151            _ => panic!("stream config get properties error"),
152        };
153
154        assert_eq!(props1.unique_id.unwrap(),                *TEST_UNIQUE_ID);
155        assert_eq!(props1.is_input.unwrap(),                 false);
156        assert_eq!(props1.can_mute.unwrap(),                 false);
157        assert_eq!(props1.can_agc.unwrap(),                  false);
158        assert_eq!(props1.min_gain_db.unwrap(),              0f32);
159        assert_eq!(props1.max_gain_db.unwrap(),              0f32);
160        assert_eq!(props1.gain_step_db.unwrap(),             0f32);
161        assert_eq!(props1.plug_detect_capabilities.unwrap(), PlugDetectCapabilities::Hardwired);
162        assert_eq!(props1.manufacturer.unwrap(),             "Google");
163        assert_eq!(props1.product.unwrap(),                  "UnitTest");
164        assert_eq!(props1.clock_domain.unwrap(),             TEST_CLOCK_DOMAIN);
165
166        let result = exec.run_until_stalled(&mut stream_config.get_supported_formats());
167        assert!(result.is_ready());
168
169        let formats = match result {
170            Poll::Ready(Ok(v)) => v,
171            _ => panic!("get supported formats error"),
172        };
173
174        let first = formats.first().to_owned().expect("supported formats to be present");
175        let pcm = first.pcm_supported_formats.to_owned().expect("pcm format to be present");
176        assert_eq!(pcm.channel_sets.unwrap()[0].attributes.as_ref().unwrap().len(), 2usize);
177        assert_eq!(pcm.sample_formats.unwrap()[0],        SampleFormat::PcmSigned);
178        assert_eq!(pcm.bytes_per_sample.unwrap()[0],      2u8);
179        assert_eq!(pcm.valid_bits_per_sample.unwrap()[0], 16u8);
180        assert_eq!(pcm.frame_rates.unwrap()[0],           44100);
181
182        let (ring_buffer, server) = fidl::endpoints::create_proxy::<RingBufferMarker>();
183
184        let format = Format {
185            pcm_format: Some(fidl_fuchsia_hardware_audio::PcmFormat {
186                number_of_channels:      2u8,
187                sample_format:           SampleFormat::PcmSigned,
188                bytes_per_sample:        2u8,
189                valid_bits_per_sample:   16u8,
190                frame_rate:              44100,
191            }),
192            ..Default::default()
193        };
194
195        stream_config.create_ring_buffer(&format, server).expect("ring buffer error");
196
197        let props2 = match exec.run_until_stalled(&mut ring_buffer.get_properties()) {
198            Poll::Ready(Ok(v)) => v,
199            x => panic!("expected Ready Ok from get_properties, got {:?}", x),
200        };
201        assert_eq!(props2.needs_cache_flush_or_invalidate, Some(false));
202        assert!(props2.driver_transfer_bytes.unwrap() > 0);
203
204        let result = exec.run_until_stalled(&mut ring_buffer.get_vmo(88200, 0)); // 2 seconds.
205        assert!(result.is_ready());
206        let reply = match result {
207            Poll::Ready(Ok(Ok(v))) => v,
208            _ => panic!("ring buffer get vmo error"),
209        };
210        let audio_vmo = reply.1;
211
212        // Frames * bytes per sample * channels per sample.
213        let bytes_per_second: usize = 44100 * 2 * 2;
214        let vmo_size = audio_vmo.get_size().expect("size after getbuffer");
215        assert!(bytes_per_second <= vmo_size as usize);
216
217        // Put "audio" in buffer.
218        let mut sent_audio = Vec::new();
219        let mut x: u8 = 0x01;
220        sent_audio.resize_with(bytes_per_second, || {
221            x = x.wrapping_add(2);
222            x
223        });
224
225        assert_eq!(Ok(()), audio_vmo.write(&sent_audio, 0));
226
227        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(42));
228        let _ = exec.wake_expired_timers();
229        let start_time = exec.run_until_stalled(&mut ring_buffer.start());
230        if let Poll::Ready(s) = start_time {
231            assert_eq!(s.expect("start time error"), 42);
232        } else {
233            panic!("start error");
234        }
235
236        exec.run_until_stalled(&mut frame_fut).expect_pending("no frames until time passes");
237
238        // Run the ring buffer for a bit over half a second.
239        exec.set_fake_time(fasync::MonotonicInstant::after(zx::MonotonicDuration::from_millis(500)));
240        let _ = exec.wake_expired_timers();
241
242        let result = exec.run_until_stalled(&mut frame_fut);
243        assert!(result.is_ready());
244        let audio_recv = match result {
245            Poll::Ready(Some(Ok(v))) => v,
246            x => panic!("expected Ready Ok from frame stream, got {:?}", x),
247        };
248
249        // We should receive exactly 100ms of audio
250        let expect_recv_bytes = bytes_per_second / 10;
251        assert_eq!(expect_recv_bytes, audio_recv.len());
252        assert_eq!(&sent_audio[0..expect_recv_bytes], &audio_recv);
253
254        let result = exec.run_until_stalled(&mut frame_fut);
255        assert!(result.is_ready());
256        let audio_recv = match result {
257            Poll::Ready(Some(Ok(v))) => v,
258            x => panic!("expected Ready Ok from frame stream, got {:?}", x),
259        };
260
261        // We should receive exactly the next 100ms of audio
262        let expect_recv_bytes = bytes_per_second / 10;
263        assert_eq!(expect_recv_bytes, audio_recv.len());
264        assert_eq!(&sent_audio[expect_recv_bytes..expect_recv_bytes*2], &audio_recv);
265
266
267        let result = exec.run_until_stalled(&mut ring_buffer.stop());
268        assert!(result.is_ready());
269
270        // Watch gain only replies once.
271        let result = exec.run_until_stalled(&mut stream_config.watch_gain_state());
272        assert!(result.is_ready());
273        let result = exec.run_until_stalled(&mut stream_config.watch_gain_state());
274        assert!(!result.is_ready());
275
276        // Watch plug state only replies once.
277        let result = exec.run_until_stalled(&mut stream_config.watch_plug_state());
278        assert!(result.is_ready());
279        let result = exec.run_until_stalled(&mut stream_config.watch_plug_state());
280        assert!(!result.is_ready());
281    }
282}