fuchsia_audio_device/
audio_frame_sink.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_inspect as inspect;
6use fuchsia_inspect_derive::{AttachError, Inspect};
7use fuchsia_sync::Mutex;
8use futures::task::{Context, Poll};
9use futures::{io, FutureExt};
10use log::warn;
11use std::pin::Pin;
12use std::sync::Arc;
13
14use crate::frame_vmo;
15use crate::stream_config::{SoftStreamConfig, StreamConfigOrTask};
16use crate::types::{Error, Result};
17
18/// A sink that accepts audio frames to send as input to Fuchsia audio
19/// Usually acquired via SoftStreamConfig::create_input()
20pub struct AudioFrameSink {
21    /// Handle to the VMO that is receiving the frames.
22    frame_vmo: Arc<Mutex<frame_vmo::FrameVmo>>,
23    /// The index of the next frame we are writing.
24    next_frame_index: usize,
25    /// StreamConfig this is attached to, or the SoftStreamConfig::process_requests task
26    stream_config: StreamConfigOrTask,
27    /// Inspect node
28    inspect: inspect::Node,
29}
30
31impl AudioFrameSink {
32    pub fn new(stream_config: SoftStreamConfig) -> AudioFrameSink {
33        AudioFrameSink {
34            frame_vmo: stream_config.frame_vmo(),
35            next_frame_index: 0,
36            stream_config: StreamConfigOrTask::StreamConfig(stream_config),
37            inspect: Default::default(),
38        }
39    }
40
41    /// Start the requests task if not started, and poll the task.
42    fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
43        if let StreamConfigOrTask::Complete = &self.stream_config {
44            return Poll::Ready(Err(Error::InvalidState));
45        }
46        if let StreamConfigOrTask::Task(ref mut task) = &mut self.stream_config {
47            return task.poll_unpin(cx);
48        }
49        self.stream_config.start();
50        self.poll_task(cx)
51    }
52}
53
54impl Inspect for &mut AudioFrameSink {
55    fn iattach(
56        self,
57        parent: &fuchsia_inspect::Node,
58        name: impl AsRef<str>,
59    ) -> core::result::Result<(), AttachError> {
60        self.inspect = parent.create_child(name.as_ref());
61        if let StreamConfigOrTask::StreamConfig(ref mut o) = &mut self.stream_config {
62            return o.iattach(&self.inspect, "soft_stream_config");
63        }
64        Ok(())
65    }
66}
67
68impl io::AsyncWrite for AudioFrameSink {
69    fn poll_write(
70        mut self: Pin<&mut Self>,
71        cx: &mut Context<'_>,
72        buf: &[u8],
73    ) -> Poll<std::result::Result<usize, io::Error>> {
74        if let Poll::Ready(r) = self.poll_task(cx) {
75            self.stream_config = StreamConfigOrTask::Complete;
76            if let Some(error) = r.err() {
77                return Poll::Ready(Err(io::Error::other(error)));
78            } else {
79                return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
80            }
81        }
82        let result = {
83            let mut lock = self.frame_vmo.lock();
84            futures::ready!(lock.poll_write(self.next_frame_index, buf, cx))
85        };
86        match result {
87            Ok((latest, missed)) => {
88                if missed > 0 {
89                    warn!("Couldn't write {missed} frames due to slow writing");
90                }
91                self.next_frame_index = latest;
92                // We always write the whole buffer if it's written
93                Poll::Ready(Ok(buf.len()))
94            }
95            Err(e) => Poll::Ready(Err(io::Error::other(e))),
96        }
97    }
98
99    fn poll_flush(
100        self: Pin<&mut Self>,
101        _cx: &mut Context<'_>,
102    ) -> Poll<std::result::Result<(), io::Error>> {
103        // No buffering is done
104        Poll::Ready(Ok(()))
105    }
106
107    fn poll_close(
108        mut self: Pin<&mut Self>,
109        _cx: &mut Context<'_>,
110    ) -> Poll<std::result::Result<(), io::Error>> {
111        self.stream_config = StreamConfigOrTask::Complete;
112        Poll::Ready(Ok(()))
113    }
114}
115
116#[cfg(test)]
117mod tests {
118    use super::*;
119
120    use async_utils::PollExt;
121    use fidl_fuchsia_hardware_audio::*;
122    use fidl_fuchsia_media::{AudioChannelId, AudioPcmMode, PcmFormat};
123    use fixture::fixture;
124    use fuchsia_async as fasync;
125    use futures::AsyncWriteExt;
126
127    const TEST_UNIQUE_ID: &[u8; 16] = &[5; 16];
128    const TEST_CLOCK_DOMAIN: u32 = 0x00010203;
129
130    pub(crate) fn with_audio_frame_sink<F>(_name: &str, test: F)
131    where
132        F: FnOnce(fasync::TestExecutor, StreamConfigProxy, AudioFrameSink) -> (),
133    {
134        let exec = fasync::TestExecutor::new_with_fake_time();
135        let format = PcmFormat {
136            pcm_mode: AudioPcmMode::Linear,
137            bits_per_sample: 16,
138            frames_per_second: 44100,
139            channel_map: vec![AudioChannelId::Lf, AudioChannelId::Rf],
140        };
141        let (client, frame_sink) = SoftStreamConfig::create_input(
142            TEST_UNIQUE_ID,
143            "Google",
144            "UnitTest",
145            TEST_CLOCK_DOMAIN,
146            format,
147            zx::MonotonicDuration::from_millis(100),
148        )
149        .expect("should always build");
150        test(exec, client.into_proxy(), frame_sink)
151    }
152
153    #[fixture(with_audio_frame_sink)]
154    #[fuchsia::test]
155    #[rustfmt::skip]
156    fn audio_in(mut exec: fasync::TestExecutor, stream_config: StreamConfigProxy, mut frame_sink: AudioFrameSink) {
157
158        // Some test "audio" data.  Silence in signed 16-bit, for 10ms
159        let mut send_audio = Vec::new();
160        let mut x: u8 = 0x01;
161        const BYTES_PER_SECOND: usize = 44100 * 2 * 2;  // 44100 frames, 2 bytes per frame, 2
162                                                        // channels per frame.
163        send_audio.resize_with(BYTES_PER_SECOND, || {
164            x = x.wrapping_add(2);
165            x
166        });
167
168        let mut next_byte = 0;
169        // Sending 10ms packets of the buffer (882 bytes, 441 frames * 2 bytes per frame * 2
170        // channels per frame)
171        const TEN_MS_BYTES: usize = 441 * 2 * 2;
172        let next_buf = &send_audio[next_byte..next_byte + TEN_MS_BYTES];
173        let mut write_fut = frame_sink.write(next_buf);
174        // Poll the frame stream, which should start the processing of proxy requests.
175        exec.run_until_stalled(&mut write_fut).expect_pending("not started yet");
176
177        let result = exec.run_until_stalled(&mut stream_config.get_properties());
178        let props1 = match result {
179            Poll::Ready(Ok(v)) => v,
180            x => panic!("Expected result to be ready ok, got {x:?}"),
181        };
182
183        assert_eq!(props1.unique_id.unwrap(),                *TEST_UNIQUE_ID);
184        assert_eq!(props1.is_input.unwrap(),                 true);
185        assert_eq!(props1.can_mute.unwrap(),                 false);
186        assert_eq!(props1.can_agc.unwrap(),                  false);
187        assert_eq!(props1.min_gain_db.unwrap(),              0f32);
188        assert_eq!(props1.max_gain_db.unwrap(),              0f32);
189        assert_eq!(props1.gain_step_db.unwrap(),             0f32);
190        assert_eq!(props1.plug_detect_capabilities.unwrap(), PlugDetectCapabilities::Hardwired);
191        assert_eq!(props1.manufacturer.unwrap(),             "Google");
192        assert_eq!(props1.product.unwrap(),                  "UnitTest");
193        assert_eq!(props1.clock_domain.unwrap(),             TEST_CLOCK_DOMAIN);
194
195        let result = exec.run_until_stalled(&mut stream_config.get_supported_formats());
196        let formats = match result {
197            Poll::Ready(Ok(v)) => v,
198            x => panic!("Get supported formats not ready ok: {x:?}"),
199        };
200
201        let first = formats.first().to_owned().expect("supported formats to be present");
202        let pcm = first.pcm_supported_formats.to_owned().expect("pcm format to be present");
203        assert_eq!(pcm.channel_sets.unwrap()[0].attributes.as_ref().unwrap().len(), 2usize);
204        assert_eq!(pcm.sample_formats.unwrap()[0],        SampleFormat::PcmSigned);
205        assert_eq!(pcm.bytes_per_sample.unwrap()[0],      2u8);
206        assert_eq!(pcm.valid_bits_per_sample.unwrap()[0], 16u8);
207        assert_eq!(pcm.frame_rates.unwrap()[0],           44100);
208
209        let (ring_buffer, server) = fidl::endpoints::create_proxy::<RingBufferMarker>();
210
211        let format = Format {
212            pcm_format: Some(fidl_fuchsia_hardware_audio::PcmFormat {
213                number_of_channels:      2u8,
214                sample_format:           SampleFormat::PcmSigned,
215                bytes_per_sample:        2u8,
216                valid_bits_per_sample:   16u8,
217                frame_rate:              44100,
218            }),
219            ..Default::default()
220        };
221
222        stream_config.create_ring_buffer(&format, server).expect("ring buffer error");
223        let props2 = match exec.run_until_stalled(&mut ring_buffer.get_properties()) {
224            Poll::Ready(Ok(v)) => v,
225            x => panic!("expected get_properties to be ready ok: {x:?}"),
226        };
227        assert_eq!(props2.needs_cache_flush_or_invalidate, Some(false));
228        assert!(props2.driver_transfer_bytes.unwrap() > 0);
229
230        const TWO_SEC_FRAMES: u32 = 44100 * 2;
231
232        let result = exec.run_until_stalled(&mut ring_buffer.get_vmo(TWO_SEC_FRAMES, 0)); // 2 seconds.
233        assert!(result.is_ready());
234        let reply = match result {
235            Poll::Ready(Ok(Ok(v))) => v,
236            _ => panic!("ring buffer get vmo error"),
237        };
238        let audio_vmo = reply.1;
239
240        // Frames * bytes per sample * channels per sample.
241        let bytes_per_two_seconds: usize = BYTES_PER_SECOND * 2;
242        assert!(
243            bytes_per_two_seconds <= audio_vmo.get_size().expect("should always exist after getbuffer") as usize
244        );
245
246        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(42));
247        let _ = exec.wake_expired_timers();
248        let start_time = exec.run_until_stalled(&mut ring_buffer.start());
249        if let Poll::Ready(s) = start_time {
250            assert_eq!(s.expect("start time error"), 42);
251        } else {
252            panic!("start error");
253        }
254
255        // Writing audio should succeed now. Fill up the buffer.
256        let frame_result = exec.run_until_stalled(&mut write_fut).expect("should be ready");
257        // Should have written 882 bytes (441 frames * 2 bytes per frame)
258        assert_eq!(frame_result.unwrap(), TEN_MS_BYTES);
259        let mut write_fut = loop {
260            next_byte = (next_byte + TEN_MS_BYTES) % send_audio.len();
261            let next_buf = &send_audio[next_byte..next_byte + TEN_MS_BYTES];
262            let mut write_fut = frame_sink.write(next_buf);
263            match exec.run_until_stalled(&mut write_fut) {
264                Poll::Pending => break write_fut,
265                Poll::Ready(Ok(len)) => assert_eq!(TEN_MS_BYTES, len),
266                x => panic!("Expected writes to succeed until pending, got {x:?} {next_byte}"),
267            };
268        };
269
270        // Shouldn't be able to write any more until time goes forward.
271        exec.run_until_stalled(&mut write_fut).expect_pending("buffer is full");
272
273        // Run the ring buffer for a bit over half a second.
274        exec.set_fake_time(fasync::MonotonicInstant::after(zx::MonotonicDuration::from_millis(500)));
275        let _ = exec.wake_expired_timers();
276
277        // Should be able to write again now.
278        let result = exec.run_until_stalled(&mut write_fut).expect("buf isn't full");
279        match result {
280            Ok(len) => assert_eq!(TEN_MS_BYTES, len),
281            Err(x) => panic!("Ok from frame write, got {x:?}"),
282        };
283
284        let result = exec.run_until_stalled(&mut ring_buffer.stop());
285        assert!(result.is_ready());
286
287        // Watch gain only replies once.
288        let result = exec.run_until_stalled(&mut stream_config.watch_gain_state());
289        assert!(result.is_ready());
290        let result = exec.run_until_stalled(&mut stream_config.watch_gain_state());
291        assert!(!result.is_ready());
292
293        // Watch plug state only replies once.
294        let result = exec.run_until_stalled(&mut stream_config.watch_plug_state());
295        assert!(result.is_ready());
296        let result = exec.run_until_stalled(&mut stream_config.watch_plug_state());
297        assert!(!result.is_ready());
298    }
299}