1use fuchsia_inspect as inspect;
6use fuchsia_inspect_derive::{AttachError, Inspect};
7use fuchsia_sync::Mutex;
8use futures::task::{Context, Poll};
9use futures::{io, FutureExt};
10use log::warn;
11use std::pin::Pin;
12use std::sync::Arc;
13
14use crate::frame_vmo;
15use crate::stream_config::{SoftStreamConfig, StreamConfigOrTask};
16use crate::types::{Error, Result};
17
18pub struct AudioFrameSink {
21 frame_vmo: Arc<Mutex<frame_vmo::FrameVmo>>,
23 next_frame_index: usize,
25 stream_config: StreamConfigOrTask,
27 inspect: inspect::Node,
29}
30
31impl AudioFrameSink {
32 pub fn new(stream_config: SoftStreamConfig) -> AudioFrameSink {
33 AudioFrameSink {
34 frame_vmo: stream_config.frame_vmo(),
35 next_frame_index: 0,
36 stream_config: StreamConfigOrTask::StreamConfig(stream_config),
37 inspect: Default::default(),
38 }
39 }
40
41 fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
43 if let StreamConfigOrTask::Complete = &self.stream_config {
44 return Poll::Ready(Err(Error::InvalidState));
45 }
46 if let StreamConfigOrTask::Task(ref mut task) = &mut self.stream_config {
47 return task.poll_unpin(cx);
48 }
49 self.stream_config.start();
50 self.poll_task(cx)
51 }
52}
53
54impl Inspect for &mut AudioFrameSink {
55 fn iattach(
56 self,
57 parent: &fuchsia_inspect::Node,
58 name: impl AsRef<str>,
59 ) -> core::result::Result<(), AttachError> {
60 self.inspect = parent.create_child(name.as_ref());
61 if let StreamConfigOrTask::StreamConfig(ref mut o) = &mut self.stream_config {
62 return o.iattach(&self.inspect, "soft_stream_config");
63 }
64 Ok(())
65 }
66}
67
68impl io::AsyncWrite for AudioFrameSink {
69 fn poll_write(
70 mut self: Pin<&mut Self>,
71 cx: &mut Context<'_>,
72 buf: &[u8],
73 ) -> Poll<std::result::Result<usize, io::Error>> {
74 if let Poll::Ready(r) = self.poll_task(cx) {
75 self.stream_config = StreamConfigOrTask::Complete;
76 if let Some(error) = r.err() {
77 return Poll::Ready(Err(io::Error::other(error)));
78 } else {
79 return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
80 }
81 }
82 let result = {
83 let mut lock = self.frame_vmo.lock();
84 futures::ready!(lock.poll_write(self.next_frame_index, buf, cx))
85 };
86 match result {
87 Ok((latest, missed)) => {
88 if missed > 0 {
89 warn!("Couldn't write {missed} frames due to slow writing");
90 }
91 self.next_frame_index = latest;
92 Poll::Ready(Ok(buf.len()))
94 }
95 Err(e) => Poll::Ready(Err(io::Error::other(e))),
96 }
97 }
98
99 fn poll_flush(
100 self: Pin<&mut Self>,
101 _cx: &mut Context<'_>,
102 ) -> Poll<std::result::Result<(), io::Error>> {
103 Poll::Ready(Ok(()))
105 }
106
107 fn poll_close(
108 mut self: Pin<&mut Self>,
109 _cx: &mut Context<'_>,
110 ) -> Poll<std::result::Result<(), io::Error>> {
111 self.stream_config = StreamConfigOrTask::Complete;
112 Poll::Ready(Ok(()))
113 }
114}
115
116#[cfg(test)]
117mod tests {
118 use super::*;
119
120 use async_utils::PollExt;
121 use fidl_fuchsia_hardware_audio::*;
122 use fidl_fuchsia_media::{AudioChannelId, AudioPcmMode, PcmFormat};
123 use fixture::fixture;
124 use fuchsia_async as fasync;
125 use futures::AsyncWriteExt;
126
127 const TEST_UNIQUE_ID: &[u8; 16] = &[5; 16];
128 const TEST_CLOCK_DOMAIN: u32 = 0x00010203;
129
130 pub(crate) fn with_audio_frame_sink<F>(_name: &str, test: F)
131 where
132 F: FnOnce(fasync::TestExecutor, StreamConfigProxy, AudioFrameSink) -> (),
133 {
134 let exec = fasync::TestExecutor::new_with_fake_time();
135 let format = PcmFormat {
136 pcm_mode: AudioPcmMode::Linear,
137 bits_per_sample: 16,
138 frames_per_second: 44100,
139 channel_map: vec![AudioChannelId::Lf, AudioChannelId::Rf],
140 };
141 let (client, frame_sink) = SoftStreamConfig::create_input(
142 TEST_UNIQUE_ID,
143 "Google",
144 "UnitTest",
145 TEST_CLOCK_DOMAIN,
146 format,
147 zx::MonotonicDuration::from_millis(100),
148 )
149 .expect("should always build");
150 test(exec, client.into_proxy(), frame_sink)
151 }
152
153 #[fixture(with_audio_frame_sink)]
154 #[fuchsia::test]
155 #[rustfmt::skip]
156 fn audio_in(mut exec: fasync::TestExecutor, stream_config: StreamConfigProxy, mut frame_sink: AudioFrameSink) {
157
158 let mut send_audio = Vec::new();
160 let mut x: u8 = 0x01;
161 const BYTES_PER_SECOND: usize = 44100 * 2 * 2; send_audio.resize_with(BYTES_PER_SECOND, || {
164 x = x.wrapping_add(2);
165 x
166 });
167
168 let mut next_byte = 0;
169 const TEN_MS_BYTES: usize = 441 * 2 * 2;
172 let next_buf = &send_audio[next_byte..next_byte + TEN_MS_BYTES];
173 let mut write_fut = frame_sink.write(next_buf);
174 exec.run_until_stalled(&mut write_fut).expect_pending("not started yet");
176
177 let result = exec.run_until_stalled(&mut stream_config.get_properties());
178 let props1 = match result {
179 Poll::Ready(Ok(v)) => v,
180 x => panic!("Expected result to be ready ok, got {x:?}"),
181 };
182
183 assert_eq!(props1.unique_id.unwrap(), *TEST_UNIQUE_ID);
184 assert_eq!(props1.is_input.unwrap(), true);
185 assert_eq!(props1.can_mute.unwrap(), false);
186 assert_eq!(props1.can_agc.unwrap(), false);
187 assert_eq!(props1.min_gain_db.unwrap(), 0f32);
188 assert_eq!(props1.max_gain_db.unwrap(), 0f32);
189 assert_eq!(props1.gain_step_db.unwrap(), 0f32);
190 assert_eq!(props1.plug_detect_capabilities.unwrap(), PlugDetectCapabilities::Hardwired);
191 assert_eq!(props1.manufacturer.unwrap(), "Google");
192 assert_eq!(props1.product.unwrap(), "UnitTest");
193 assert_eq!(props1.clock_domain.unwrap(), TEST_CLOCK_DOMAIN);
194
195 let result = exec.run_until_stalled(&mut stream_config.get_supported_formats());
196 let formats = match result {
197 Poll::Ready(Ok(v)) => v,
198 x => panic!("Get supported formats not ready ok: {x:?}"),
199 };
200
201 let first = formats.first().to_owned().expect("supported formats to be present");
202 let pcm = first.pcm_supported_formats.to_owned().expect("pcm format to be present");
203 assert_eq!(pcm.channel_sets.unwrap()[0].attributes.as_ref().unwrap().len(), 2usize);
204 assert_eq!(pcm.sample_formats.unwrap()[0], SampleFormat::PcmSigned);
205 assert_eq!(pcm.bytes_per_sample.unwrap()[0], 2u8);
206 assert_eq!(pcm.valid_bits_per_sample.unwrap()[0], 16u8);
207 assert_eq!(pcm.frame_rates.unwrap()[0], 44100);
208
209 let (ring_buffer, server) = fidl::endpoints::create_proxy::<RingBufferMarker>();
210
211 let format = Format {
212 pcm_format: Some(fidl_fuchsia_hardware_audio::PcmFormat {
213 number_of_channels: 2u8,
214 sample_format: SampleFormat::PcmSigned,
215 bytes_per_sample: 2u8,
216 valid_bits_per_sample: 16u8,
217 frame_rate: 44100,
218 }),
219 ..Default::default()
220 };
221
222 stream_config.create_ring_buffer(&format, server).expect("ring buffer error");
223 let props2 = match exec.run_until_stalled(&mut ring_buffer.get_properties()) {
224 Poll::Ready(Ok(v)) => v,
225 x => panic!("expected get_properties to be ready ok: {x:?}"),
226 };
227 assert_eq!(props2.needs_cache_flush_or_invalidate, Some(false));
228 assert!(props2.driver_transfer_bytes.unwrap() > 0);
229
230 const TWO_SEC_FRAMES: u32 = 44100 * 2;
231
232 let result = exec.run_until_stalled(&mut ring_buffer.get_vmo(TWO_SEC_FRAMES, 0)); assert!(result.is_ready());
234 let reply = match result {
235 Poll::Ready(Ok(Ok(v))) => v,
236 _ => panic!("ring buffer get vmo error"),
237 };
238 let audio_vmo = reply.1;
239
240 let bytes_per_two_seconds: usize = BYTES_PER_SECOND * 2;
242 assert!(
243 bytes_per_two_seconds <= audio_vmo.get_size().expect("should always exist after getbuffer") as usize
244 );
245
246 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(42));
247 let _ = exec.wake_expired_timers();
248 let start_time = exec.run_until_stalled(&mut ring_buffer.start());
249 if let Poll::Ready(s) = start_time {
250 assert_eq!(s.expect("start time error"), 42);
251 } else {
252 panic!("start error");
253 }
254
255 let frame_result = exec.run_until_stalled(&mut write_fut).expect("should be ready");
257 assert_eq!(frame_result.unwrap(), TEN_MS_BYTES);
259 let mut write_fut = loop {
260 next_byte = (next_byte + TEN_MS_BYTES) % send_audio.len();
261 let next_buf = &send_audio[next_byte..next_byte + TEN_MS_BYTES];
262 let mut write_fut = frame_sink.write(next_buf);
263 match exec.run_until_stalled(&mut write_fut) {
264 Poll::Pending => break write_fut,
265 Poll::Ready(Ok(len)) => assert_eq!(TEN_MS_BYTES, len),
266 x => panic!("Expected writes to succeed until pending, got {x:?} {next_byte}"),
267 };
268 };
269
270 exec.run_until_stalled(&mut write_fut).expect_pending("buffer is full");
272
273 exec.set_fake_time(fasync::MonotonicInstant::after(zx::MonotonicDuration::from_millis(500)));
275 let _ = exec.wake_expired_timers();
276
277 let result = exec.run_until_stalled(&mut write_fut).expect("buf isn't full");
279 match result {
280 Ok(len) => assert_eq!(TEN_MS_BYTES, len),
281 Err(x) => panic!("Ok from frame write, got {x:?}"),
282 };
283
284 let result = exec.run_until_stalled(&mut ring_buffer.stop());
285 assert!(result.is_ready());
286
287 let result = exec.run_until_stalled(&mut stream_config.watch_gain_state());
289 assert!(result.is_ready());
290 let result = exec.run_until_stalled(&mut stream_config.watch_gain_state());
291 assert!(!result.is_ready());
292
293 let result = exec.run_until_stalled(&mut stream_config.watch_plug_state());
295 assert!(result.is_ready());
296 let result = exec.run_until_stalled(&mut stream_config.watch_plug_state());
297 assert!(!result.is_ready());
298 }
299}