1use fuchsia_inspect as inspect;
6use fuchsia_inspect_derive::{AttachError, Inspect};
7use fuchsia_sync::Mutex;
8use futures::stream::FusedStream;
9use futures::task::{Context, Poll};
10use futures::{FutureExt, Stream};
11use log::info;
12use std::pin::Pin;
13use std::sync::Arc;
14
15use crate::frame_vmo;
16use crate::stream_config::{SoftStreamConfig, StreamConfigOrTask};
17use crate::types::{Error, Result};
18
19pub struct AudioFrameStream {
23 frame_vmo: Arc<Mutex<frame_vmo::FrameVmo>>,
25 next_frame: usize,
27 packet_frames: usize,
29 next_packet: std::cell::RefCell<Vec<u8>>,
32 stream_task: StreamConfigOrTask,
34 inspect: inspect::Node,
36}
37
38impl AudioFrameStream {
39 pub fn new(stream: SoftStreamConfig) -> AudioFrameStream {
40 AudioFrameStream {
41 frame_vmo: stream.frame_vmo(),
42 next_frame: 0,
43 packet_frames: stream.packet_frames(),
44 next_packet: Vec::new().into(),
45 stream_task: StreamConfigOrTask::StreamConfig(stream),
46 inspect: Default::default(),
47 }
48 }
49
50 fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
52 if let StreamConfigOrTask::Complete = &self.stream_task {
53 return Poll::Ready(Err(Error::InvalidState));
54 }
55 if let StreamConfigOrTask::Task(ref mut task) = &mut self.stream_task {
56 return task.poll_unpin(cx);
57 }
58 self.stream_task.start();
59 self.poll_task(cx)
60 }
61}
62
63impl Stream for AudioFrameStream {
64 type Item = Result<Vec<u8>>;
65
66 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67 if let Poll::Ready(r) = self.poll_task(cx) {
68 self.stream_task = StreamConfigOrTask::Complete;
69 return Poll::Ready(r.err().map(Result::Err));
70 }
71 if self.next_packet.borrow().len() == 0 {
72 if let Some(new_len) = self.frame_vmo.lock().bytecount_frames(self.packet_frames) {
73 self.next_packet.borrow_mut().resize(new_len, 0);
74 }
75 }
76 let result = {
77 let mut lock = self.frame_vmo.lock();
78 futures::ready!(lock.poll_read(
79 self.next_frame,
80 self.next_packet.borrow_mut().as_mut_slice(),
81 cx
82 ))
83 };
84
85 match result {
86 Ok((next_frame, missed)) => {
87 if missed > 0 {
88 info!("Missed {missed} frames due to slow polling");
89 }
90 self.next_frame = next_frame;
91 let vec_mut = self.next_packet.get_mut();
92 let bytes = vec_mut.len();
93 let frames = std::mem::replace(vec_mut, vec![0; bytes]);
94 Poll::Ready(Some(Ok(frames)))
95 }
96 Err(e) => Poll::Ready(Some(Err(e))),
97 }
98 }
99}
100
101impl FusedStream for AudioFrameStream {
102 fn is_terminated(&self) -> bool {
103 match self.stream_task {
104 StreamConfigOrTask::Complete => true,
105 _ => false,
106 }
107 }
108}
109
110impl Inspect for &mut AudioFrameStream {
111 fn iattach(
112 self,
113 parent: &fuchsia_inspect::Node,
114 name: impl AsRef<str>,
115 ) -> core::result::Result<(), AttachError> {
116 self.inspect = parent.create_child(name.as_ref());
117 if let StreamConfigOrTask::StreamConfig(ref mut o) = &mut self.stream_task {
118 return o.iattach(&self.inspect, "stream_config");
119 }
120 Ok(())
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use super::*;
127
128 use async_utils::PollExt;
129 use fidl_fuchsia_hardware_audio::*;
130 use fixture::fixture;
131 use fuchsia_async as fasync;
132 use futures::StreamExt;
133
134 use crate::stream_config::tests::with_audio_frame_stream;
135
136 const TEST_UNIQUE_ID: &[u8; 16] = &[5; 16];
137 const TEST_CLOCK_DOMAIN: u32 = 0x00010203;
138
139 #[fixture(with_audio_frame_stream)]
140 #[fuchsia::test]
141 #[rustfmt::skip]
142 fn soft_audio_out(mut exec: fasync::TestExecutor, stream_config: StreamConfigProxy, mut frame_stream: AudioFrameStream) {
143 let mut frame_fut = frame_stream.next();
144 exec.run_until_stalled(&mut frame_fut).expect_pending("no frames yet");
146
147 let result = exec.run_until_stalled(&mut stream_config.get_properties());
148 assert!(result.is_ready());
149 let props1 = match result {
150 Poll::Ready(Ok(v)) => v,
151 _ => panic!("stream config get properties error"),
152 };
153
154 assert_eq!(props1.unique_id.unwrap(), *TEST_UNIQUE_ID);
155 assert_eq!(props1.is_input.unwrap(), false);
156 assert_eq!(props1.can_mute.unwrap(), false);
157 assert_eq!(props1.can_agc.unwrap(), false);
158 assert_eq!(props1.min_gain_db.unwrap(), 0f32);
159 assert_eq!(props1.max_gain_db.unwrap(), 0f32);
160 assert_eq!(props1.gain_step_db.unwrap(), 0f32);
161 assert_eq!(props1.plug_detect_capabilities.unwrap(), PlugDetectCapabilities::Hardwired);
162 assert_eq!(props1.manufacturer.unwrap(), "Google");
163 assert_eq!(props1.product.unwrap(), "UnitTest");
164 assert_eq!(props1.clock_domain.unwrap(), TEST_CLOCK_DOMAIN);
165
166 let result = exec.run_until_stalled(&mut stream_config.get_supported_formats());
167 assert!(result.is_ready());
168
169 let formats = match result {
170 Poll::Ready(Ok(v)) => v,
171 _ => panic!("get supported formats error"),
172 };
173
174 let first = formats.first().to_owned().expect("supported formats to be present");
175 let pcm = first.pcm_supported_formats.to_owned().expect("pcm format to be present");
176 assert_eq!(pcm.channel_sets.unwrap()[0].attributes.as_ref().unwrap().len(), 2usize);
177 assert_eq!(pcm.sample_formats.unwrap()[0], SampleFormat::PcmSigned);
178 assert_eq!(pcm.bytes_per_sample.unwrap()[0], 2u8);
179 assert_eq!(pcm.valid_bits_per_sample.unwrap()[0], 16u8);
180 assert_eq!(pcm.frame_rates.unwrap()[0], 44100);
181
182 let (ring_buffer, server) = fidl::endpoints::create_proxy::<RingBufferMarker>();
183
184 let format = Format {
185 pcm_format: Some(fidl_fuchsia_hardware_audio::PcmFormat {
186 number_of_channels: 2u8,
187 sample_format: SampleFormat::PcmSigned,
188 bytes_per_sample: 2u8,
189 valid_bits_per_sample: 16u8,
190 frame_rate: 44100,
191 }),
192 ..Default::default()
193 };
194
195 stream_config.create_ring_buffer(&format, server).expect("ring buffer error");
196
197 let props2 = match exec.run_until_stalled(&mut ring_buffer.get_properties()) {
198 Poll::Ready(Ok(v)) => v,
199 x => panic!("expected Ready Ok from get_properties, got {:?}", x),
200 };
201 assert_eq!(props2.needs_cache_flush_or_invalidate, Some(false));
202 assert!(props2.driver_transfer_bytes.unwrap() > 0);
203
204 let result = exec.run_until_stalled(&mut ring_buffer.get_vmo(88200, 0)); assert!(result.is_ready());
206 let reply = match result {
207 Poll::Ready(Ok(Ok(v))) => v,
208 _ => panic!("ring buffer get vmo error"),
209 };
210 let audio_vmo = reply.1;
211
212 let bytes_per_second: usize = 44100 * 2 * 2;
214 let vmo_size = audio_vmo.get_size().expect("size after getbuffer");
215 assert!(bytes_per_second <= vmo_size as usize);
216
217 let mut sent_audio = Vec::new();
219 let mut x: u8 = 0x01;
220 sent_audio.resize_with(bytes_per_second, || {
221 x = x.wrapping_add(2);
222 x
223 });
224
225 assert_eq!(Ok(()), audio_vmo.write(&sent_audio, 0));
226
227 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(42));
228 let _ = exec.wake_expired_timers();
229 let start_time = exec.run_until_stalled(&mut ring_buffer.start());
230 if let Poll::Ready(s) = start_time {
231 assert_eq!(s.expect("start time error"), 42);
232 } else {
233 panic!("start error");
234 }
235
236 exec.run_until_stalled(&mut frame_fut).expect_pending("no frames until time passes");
237
238 exec.set_fake_time(fasync::MonotonicInstant::after(zx::MonotonicDuration::from_millis(500)));
240 let _ = exec.wake_expired_timers();
241
242 let result = exec.run_until_stalled(&mut frame_fut);
243 assert!(result.is_ready());
244 let audio_recv = match result {
245 Poll::Ready(Some(Ok(v))) => v,
246 x => panic!("expected Ready Ok from frame stream, got {:?}", x),
247 };
248
249 let expect_recv_bytes = bytes_per_second / 10;
251 assert_eq!(expect_recv_bytes, audio_recv.len());
252 assert_eq!(&sent_audio[0..expect_recv_bytes], &audio_recv);
253
254 let result = exec.run_until_stalled(&mut frame_fut);
255 assert!(result.is_ready());
256 let audio_recv = match result {
257 Poll::Ready(Some(Ok(v))) => v,
258 x => panic!("expected Ready Ok from frame stream, got {:?}", x),
259 };
260
261 let expect_recv_bytes = bytes_per_second / 10;
263 assert_eq!(expect_recv_bytes, audio_recv.len());
264 assert_eq!(&sent_audio[expect_recv_bytes..expect_recv_bytes*2], &audio_recv);
265
266
267 let result = exec.run_until_stalled(&mut ring_buffer.stop());
268 assert!(result.is_ready());
269
270 let result = exec.run_until_stalled(&mut stream_config.watch_gain_state());
272 assert!(result.is_ready());
273 let result = exec.run_until_stalled(&mut stream_config.watch_gain_state());
274 assert!(!result.is_ready());
275
276 let result = exec.run_until_stalled(&mut stream_config.watch_plug_state());
278 assert!(result.is_ready());
279 let result = exec.run_until_stalled(&mut stream_config.watch_plug_state());
280 assert!(!result.is_ready());
281 }
282}