bt_hfp/audio/
inband.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::format_err;
6use fuchsia_audio_codec::{StreamProcessor, StreamProcessorOutputStream};
7use fuchsia_audio_device::stream_config::SoftStreamConfig;
8use fuchsia_audio_device::{AudioFrameSink, AudioFrameStream};
9use fuchsia_bluetooth::types::{PeerId, peer_audio_stream_id};
10use fuchsia_sync::Mutex;
11use futures::stream::BoxStream;
12use futures::task::Context;
13use futures::{AsyncWriteExt, FutureExt, StreamExt};
14use log::{error, info, warn};
15use media::AudioDeviceEnumeratorProxy;
16use std::pin::pin;
17use {fidl_fuchsia_bluetooth_bredr as bredr, fidl_fuchsia_media as media, fuchsia_async as fasync};
18
19use crate::audio::{Control, ControlEvent, Error, HF_INPUT_UUID, HF_OUTPUT_UUID};
20use crate::codec_id::CodecId;
21use crate::sco;
22
23/// Audio Control for inband audio, i.e. encoding and decoding audio before sending them
24/// to the controller via HCI (in contrast to offloaded audio).
25pub struct InbandControl {
26    audio_core: media::AudioDeviceEnumeratorProxy,
27    session_task: Option<(PeerId, fasync::Task<()>)>,
28    event_sender: Mutex<futures::channel::mpsc::Sender<ControlEvent>>,
29    stream: Mutex<Option<futures::channel::mpsc::Receiver<ControlEvent>>>,
30}
31
32// Setup for a running AudioSession.
33// AudioSesison::run() consumes the session and should handle the data path in both directions:
34//   - SCO -> decoder -> audio_core input (audio_frame_sink)
35//   - audio_core output -> encoder -> SCO
36struct AudioSession {
37    audio_frame_sink: AudioFrameSink,
38    audio_frame_stream: AudioFrameStream,
39    sco: sco::Connection,
40    codec: CodecId,
41    decoder: StreamProcessor,
42    encoder: StreamProcessor,
43    event_sender: futures::channel::mpsc::Sender<ControlEvent>,
44}
45
46impl AudioSession {
47    fn setup(
48        connection: sco::Connection,
49        codec: CodecId,
50        audio_frame_sink: AudioFrameSink,
51        audio_frame_stream: AudioFrameStream,
52        event_sender: futures::channel::mpsc::Sender<ControlEvent>,
53    ) -> Result<Self, Error> {
54        if !codec.is_supported() {
55            return Err(Error::UnsupportedParameters {
56                source: format_err!("unsupported codec {codec}"),
57            });
58        }
59        let decoder = StreamProcessor::create_decoder(codec.mime_type()?, Some(codec.oob_bytes()))
60            .map_err(|e| Error::audio_core(format_err!("creating decoder: {e:?}")))?;
61        let encoder = StreamProcessor::create_encoder(codec.try_into()?, codec.try_into()?)
62            .map_err(|e| Error::audio_core(format_err!("creating encoder: {e:?}")))?;
63        Ok(Self {
64            sco: connection,
65            decoder,
66            encoder,
67            audio_frame_sink,
68            audio_frame_stream,
69            codec,
70            event_sender,
71        })
72    }
73
74    async fn encoder_to_sco(
75        mut encoded_stream: StreamProcessorOutputStream,
76        proxy: bredr::ScoConnectionProxy,
77        codec: CodecId,
78    ) -> Error {
79        // Pre-allocate the packet vector and reuse to avoid allocating for every packet.
80        let packet: Vec<u8> = vec![0; 60]; // SCO has 60 byte packets
81        let mut request =
82            bredr::ScoConnectionWriteRequest { data: Some(packet), ..Default::default() };
83
84        const MSBC_ENCODED_LEN: usize = 57; // Length of a MSBC packet after encoding.
85        if codec == CodecId::MSBC {
86            let packet: &mut [u8] = request.data.as_mut().unwrap().as_mut_slice();
87            packet[0] = 0x01; // H2 header has a constant part (0b1000_0000_0001_AABB) with AABB
88            // cycling 0000, 0011, 1100, 1111
89        }
90        // The H2 Header marker cycle, with the constant part
91        let mut h2_marker = [0x08u8, 0x38, 0xc8, 0xf8].iter().cycle();
92        loop {
93            match encoded_stream.next().await {
94                Some(Ok(encoded)) => {
95                    if codec == CodecId::MSBC {
96                        if encoded.len() % MSBC_ENCODED_LEN != 0 {
97                            warn!("Got {} bytes, uneven number of packets", encoded.len());
98                        }
99                        for sbc_packet in encoded.as_slice().chunks_exact(MSBC_ENCODED_LEN) {
100                            let packet: &mut [u8] = request.data.as_mut().unwrap().as_mut_slice();
101                            packet[1] = *h2_marker.next().unwrap();
102                            packet[2..59].copy_from_slice(sbc_packet);
103                            if let Err(e) = proxy.write(&request).await {
104                                return e.into();
105                            }
106                        }
107                    } else {
108                        // CVSD has no padding or header. Encoder sends us multiples of 60 bytes as
109                        // long as we provide a multiple of 7.5ms audio packets.
110                        for cvsd_packet in encoded.as_slice().chunks_exact(60) {
111                            let packet: &mut [u8] = request.data.as_mut().unwrap().as_mut_slice();
112                            packet.copy_from_slice(cvsd_packet);
113                            if let Err(e) = proxy.write(&request).await {
114                                return e.into();
115                            }
116                        }
117                    }
118                }
119                Some(Err(e)) => {
120                    warn!("Error in encoding: {e:?}");
121                    return Error::audio_core(format_err!("Couldn't read encoded: {e:?}"));
122                }
123                None => {
124                    warn!("Error in encoding: Stream is ended!");
125                    return Error::audio_core(format_err!("Encoder stream ended early"));
126                }
127            }
128        }
129    }
130
131    async fn pcm_to_encoder(mut encoder: StreamProcessor, mut stream: AudioFrameStream) -> Error {
132        loop {
133            match stream.next().await {
134                Some(Ok(pcm)) => {
135                    if let Err(e) = encoder.write_all(pcm.as_slice()).await {
136                        return Error::audio_core(format_err!("write to encoder: {e:?}"));
137                    }
138                    // Packets should be exactly the right size.
139                    if let Err(e) = encoder.flush().await {
140                        return Error::audio_core(format_err!("flush encoder: {e:?}"));
141                    }
142                }
143                Some(Err(e)) => {
144                    warn!("Audio output error: {e:?}");
145                    return Error::audio_core(format_err!("output error: {e:?}"));
146                }
147                None => {
148                    warn!("Ran out of audio input!");
149                    return Error::audio_core(format_err!("Audio input end"));
150                }
151            }
152        }
153    }
154
155    async fn decoder_to_pcm(
156        mut decoded_stream: StreamProcessorOutputStream,
157        mut sink: AudioFrameSink,
158    ) -> Error {
159        let mut decoded_packets = 0;
160        loop {
161            match decoded_stream.next().await {
162                Some(Ok(decoded)) => {
163                    decoded_packets += 1;
164                    if decoded_packets % 500 == 0 {
165                        info!(
166                            "Got {} decoded bytes from decoder: {decoded_packets} packets",
167                            decoded.len()
168                        );
169                    }
170                    if let Err(e) = sink.write_all(decoded.as_slice()).await {
171                        warn!("Error sending to sink: {e:?}");
172                        return Error::audio_core(format_err!("send to sink: {e:?}"));
173                    }
174                }
175                Some(Err(e)) => {
176                    warn!("Error in decoding: {e:?}");
177                    return Error::audio_core(format_err!("Couldn't read decoder: {e:?}"));
178                }
179                None => {
180                    warn!("Error in decoding: Stream is ended!");
181                    return Error::audio_core(format_err!("Decoder stream ended early"));
182                }
183            }
184        }
185    }
186
187    async fn sco_to_decoder(
188        proxy: bredr::ScoConnectionProxy,
189        mut decoder: StreamProcessor,
190        codec: CodecId,
191    ) -> Error {
192        loop {
193            let data = match proxy.read().await {
194                Ok(bredr::ScoConnectionReadResponse { data: Some(data), .. }) => data,
195                Ok(_) => return Error::audio_core(format_err!("Invalid Read response")),
196                Err(e) => return e.into(),
197            };
198            let packet = match codec {
199                CodecId::CVSD => data.as_slice(),
200                CodecId::MSBC => {
201                    // H2 Header (two octets) is present on packets when WBS is used
202                    let (_header, packet) = data.as_slice().split_at(2);
203                    if packet[0] != 0xad {
204                        info!(
205                            "Packet didn't start with syncword: {:#02x} {}",
206                            packet[0],
207                            packet.len()
208                        );
209                    }
210                    packet
211                }
212                _ => {
213                    return Error::UnsupportedParameters {
214                        source: format_err!("Unknown CodecId: {codec:?}"),
215                    };
216                }
217            };
218            if let Err(e) = decoder.write_all(packet).await {
219                return Error::audio_core(format_err!("Failed to write to decoder: {e:?}"));
220            }
221            // TODO(https://fxbug.dev/42073275): buffer some packets before flushing instead of flushing on
222            // every one.
223            if let Err(e) = decoder.flush().await {
224                return Error::audio_core(format_err!("Failed to flush decoder: {e:?}"));
225            }
226        }
227    }
228
229    async fn run(mut self) {
230        let peer_id = self.sco.peer_id;
231        let Ok(encoded_stream) = self.encoder.take_output_stream() else {
232            error!("Couldn't take encoder output stream");
233            return;
234        };
235        let sco_write =
236            AudioSession::encoder_to_sco(encoded_stream, self.sco.proxy.clone(), self.codec);
237        let sco_write = pin!(sco_write);
238        let audio_to_encoder = AudioSession::pcm_to_encoder(self.encoder, self.audio_frame_stream);
239        let audio_to_encoder = pin!(audio_to_encoder);
240
241        let Ok(decoded_stream) = self.decoder.take_output_stream() else {
242            error!("Couldn't take decoder output stream");
243            return;
244        };
245        let decoder_to_sink =
246            pin!(AudioSession::decoder_to_pcm(decoded_stream, self.audio_frame_sink));
247        let sco_read =
248            AudioSession::sco_to_decoder(self.sco.proxy.clone(), self.decoder, self.codec);
249        let sco_read = pin!(sco_read);
250        let e = futures::select! {
251            e = audio_to_encoder.fuse() => { warn!(e:?; "PCM to encoder write"); e},
252            e = sco_write.fuse() => { warn!(e:?; "Write encoded to SCO"); e},
253            e = sco_read.fuse() => { warn!(e:?; "SCO read to decoder"); e},
254            e = decoder_to_sink.fuse() => { warn!(e:?; "SCO decoder to PCM"); e},
255        };
256        let _ = self.event_sender.try_send(ControlEvent::Stopped { id: peer_id, error: Some(e) });
257    }
258
259    fn start(self) -> fasync::Task<()> {
260        fasync::Task::spawn(self.run())
261    }
262}
263
264impl InbandControl {
265    pub fn create(proxy: AudioDeviceEnumeratorProxy) -> Result<Self, Error> {
266        let (sender, receiver) = futures::channel::mpsc::channel(1);
267        Ok(Self {
268            audio_core: proxy,
269            session_task: None,
270            event_sender: Mutex::new(sender),
271            stream: Mutex::new(Some(receiver)),
272        })
273    }
274
275    fn running_id(&mut self) -> Option<PeerId> {
276        self.session_task
277            .as_mut()
278            .and_then(|(running, task)| {
279                let mut cx = Context::from_waker(&std::task::Waker::noop());
280                // We are the only thing that polls this task, so we are ok to poll it and throw away a
281                // wake.
282                task.poll_unpin(&mut cx).is_pending().then_some(running)
283            })
284            .copied()
285    }
286
287    const LOCAL_MONOTONIC_CLOCK_DOMAIN: u32 = 0;
288
289    // This is currently 2x an SCO frame which holds 7.5ms
290    // This must be a multiple of 7.5ms for the CVSD encoder to not have any remainder bytes.
291    const AUDIO_BUFFER_DURATION: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(15);
292
293    fn start_input(&mut self, peer_id: PeerId, codec_id: CodecId) -> Result<AudioFrameSink, Error> {
294        let audio_dev_id = peer_audio_stream_id(peer_id, HF_INPUT_UUID);
295        let (client, sink) = SoftStreamConfig::create_input(
296            &audio_dev_id,
297            "Fuchsia",
298            super::DEVICE_NAME,
299            Self::LOCAL_MONOTONIC_CLOCK_DOMAIN,
300            codec_id.try_into()?,
301            Self::AUDIO_BUFFER_DURATION,
302        )
303        .map_err(|e| Error::audio_core(format_err!("Couldn't create input: {e:?}")))?;
304
305        self.audio_core.add_device_by_channel(super::DEVICE_NAME, true, client)?;
306        Ok(sink)
307    }
308
309    fn start_output(
310        &mut self,
311        peer_id: PeerId,
312        codec_id: CodecId,
313    ) -> Result<AudioFrameStream, Error> {
314        let audio_dev_id = peer_audio_stream_id(peer_id, HF_OUTPUT_UUID);
315        let (client, stream) = SoftStreamConfig::create_output(
316            &audio_dev_id,
317            "Fuchsia",
318            super::DEVICE_NAME,
319            Self::LOCAL_MONOTONIC_CLOCK_DOMAIN,
320            codec_id.try_into()?,
321            Self::AUDIO_BUFFER_DURATION,
322            zx::MonotonicDuration::from_millis(0),
323        )
324        .map_err(|e| Error::audio_core(format_err!("Couldn't create output: {e:?}")))?;
325        self.audio_core.add_device_by_channel(super::DEVICE_NAME, false, client)?;
326        Ok(stream)
327    }
328}
329
330impl Control for InbandControl {
331    fn start(
332        &mut self,
333        id: PeerId,
334        connection: sco::Connection,
335        codec: CodecId,
336    ) -> Result<(), Error> {
337        if let Some(running) = self.running_id() {
338            if running == id {
339                return Err(Error::AlreadyStarted);
340            }
341            return Err(Error::UnsupportedParameters {
342                source: format_err!("Only one peer can be started inband at once"),
343            });
344        }
345        let frame_sink = self.start_input(id, codec)?;
346        let frame_stream = self.start_output(id, codec)?;
347        let session = AudioSession::setup(
348            connection,
349            codec,
350            frame_sink,
351            frame_stream,
352            self.event_sender.lock().clone(),
353        )?;
354        self.session_task = Some((id, session.start()));
355        Ok(())
356    }
357
358    fn stop(&mut self, id: PeerId) -> Result<(), Error> {
359        if self.running_id() != Some(id) {
360            return Err(Error::NotStarted);
361        }
362        self.session_task = None;
363        let _ = self.event_sender.get_mut().try_send(ControlEvent::Stopped { id, error: None });
364        Ok(())
365    }
366
367    fn connect(&mut self, _id: PeerId, _supported_codecs: &[CodecId]) {
368        // Nothing to do here
369    }
370
371    fn disconnect(&mut self, id: PeerId) {
372        let _ = self.stop(id);
373    }
374
375    fn take_events(&self) -> BoxStream<'static, ControlEvent> {
376        self.stream.lock().take().unwrap().boxed()
377    }
378
379    fn failed_request(&self, _request: ControlEvent, _error: Error) {
380        // We send no requests, so ignore this.
381    }
382}
383
384#[cfg(test)]
385mod tests {
386    use super::*;
387
388    use fidl_fuchsia_bluetooth_bredr::ScoConnectionRequestStream;
389
390    use crate::sco::test_utils::connection_for_codec;
391
392    /// A "Zero input response" SBC packet.  This is what SBC encodes to (with the MSBC settings)
393    /// when passed a flat input at zero.  Each packet represents 7.5 milliseconds of audio.
394    const ZERO_INPUT_SBC_PACKET: [u8; 60] = [
395        0x80, 0x10, 0xad, 0x00, 0x00, 0xc5, 0x00, 0x00, 0x00, 0x00, 0x77, 0x6d, 0xb6, 0xdd, 0xdb,
396        0x6d, 0xb7, 0x76, 0xdb, 0x6d, 0xdd, 0xb6, 0xdb, 0x77, 0x6d, 0xb6, 0xdd, 0xdb, 0x6d, 0xb7,
397        0x76, 0xdb, 0x6d, 0xdd, 0xb6, 0xdb, 0x77, 0x6d, 0xb6, 0xdd, 0xdb, 0x6d, 0xb7, 0x76, 0xdb,
398        0x6d, 0xdd, 0xb6, 0xdb, 0x77, 0x6d, 0xb6, 0xdd, 0xdb, 0x6d, 0xb7, 0x76, 0xdb, 0x6c, 0x00,
399    ];
400
401    /// A "zero input response" CVSD packet.
402    const ZERO_INPUT_CVSD_PACKET: [u8; 60] = [0x55; 60];
403
404    #[derive(PartialEq, Debug)]
405    enum ProcessedRequest {
406        ScoRead,
407        ScoWrite(Vec<u8>),
408    }
409
410    // Processes one sco request.  Returns true if the stream was ended.
411    async fn process_sco_request(
412        sco_request_stream: &mut ScoConnectionRequestStream,
413        read_data: Vec<u8>,
414    ) -> Option<ProcessedRequest> {
415        match sco_request_stream.next().await {
416            Some(Ok(bredr::ScoConnectionRequest::Read { responder })) => {
417                let response = bredr::ScoConnectionReadResponse {
418                    status_flag: Some(bredr::RxPacketStatus::CorrectlyReceivedData),
419                    data: Some(read_data),
420                    ..Default::default()
421                };
422                responder.send(&response).expect("sends okay");
423                Some(ProcessedRequest::ScoRead)
424            }
425            Some(Ok(bredr::ScoConnectionRequest::Write { payload, responder })) => {
426                responder.send().expect("response to write");
427                Some(ProcessedRequest::ScoWrite(payload.data.unwrap()))
428            }
429            None => None,
430            x => panic!("Expected read or write requests, got {x:?}"),
431        }
432    }
433
434    #[fuchsia::test]
435    async fn reads_audio_from_connection() {
436        let (proxy, _audio_enumerator_requests) =
437            fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
438        let mut control = InbandControl::create(proxy).unwrap();
439
440        let (connection, mut sco_request_stream) =
441            connection_for_codec(PeerId(1), CodecId::MSBC, true);
442
443        control.start(PeerId(1), connection, CodecId::MSBC).expect("should be able to start");
444
445        let (connection2, _request_stream) = connection_for_codec(PeerId(1), CodecId::MSBC, true);
446        let _ = control
447            .start(PeerId(1), connection2, CodecId::MSBC)
448            .expect_err("Starting twice shouldn't be allowed");
449
450        // Test note: 10 packets is not enough to force a write to audio, which will stall this test if
451        // it's not started.
452        for _ in 1..10 {
453            assert_eq!(
454                Some(ProcessedRequest::ScoRead),
455                process_sco_request(&mut sco_request_stream, ZERO_INPUT_SBC_PACKET.to_vec()).await
456            );
457        }
458
459        control.stop(PeerId(1)).expect("should be able to stop");
460        let _ = control.stop(PeerId(1)).expect_err("can't stop a stopped thing");
461
462        // Should be able to drain the requests.
463        let mut extra_requests = 0;
464        while let Some(r) =
465            process_sco_request(&mut sco_request_stream, ZERO_INPUT_SBC_PACKET.to_vec()).await
466        {
467            assert_eq!(ProcessedRequest::ScoRead, r);
468            extra_requests += 1;
469        }
470
471        info!("Got {extra_requests} extra ScoConnectionProxy Requests after stop");
472    }
473
474    #[fuchsia::test]
475    async fn audio_setup_error_bad_codec() {
476        let (proxy, _) =
477            fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
478        let mut control = InbandControl::create(proxy).unwrap();
479
480        let (connection, _sco_request_stream) =
481            connection_for_codec(PeerId(1), CodecId::MSBC, true);
482        let res = control.start(PeerId(1), connection, 0xD0u8.into());
483        assert!(res.is_err());
484    }
485
486    #[fuchsia::test]
487    async fn decode_sco_audio_path() {
488        use fidl_fuchsia_hardware_audio as audio;
489        let (proxy, mut audio_enumerator_requests) =
490            fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
491        let mut control = InbandControl::create(proxy).unwrap();
492
493        let (connection, mut sco_request_stream) =
494            connection_for_codec(PeerId(1), CodecId::MSBC, true);
495
496        control.start(PeerId(1), connection, CodecId::MSBC).expect("should be able to start");
497
498        let audio_input_stream_config;
499        let mut _audio_output_stream_config;
500        loop {
501            match audio_enumerator_requests.next().await {
502                Some(Ok(media::AudioDeviceEnumeratorRequest::AddDeviceByChannel {
503                    is_input,
504                    channel,
505                    ..
506                })) => {
507                    if is_input {
508                        audio_input_stream_config = channel.into_proxy();
509                        break;
510                    } else {
511                        _audio_output_stream_config = channel.into_proxy();
512                    }
513                }
514                x => panic!("Expected audio device by channel, got {x:?}"),
515            }
516        }
517
518        let (ring_buffer, server) = fidl::endpoints::create_proxy::<audio::RingBufferMarker>();
519        audio_input_stream_config
520            .create_ring_buffer(&CodecId::MSBC.try_into().unwrap(), server)
521            .expect("create ring buffer");
522
523        // We need to write to the stream at least once to start it up.
524        assert_eq!(
525            Some(ProcessedRequest::ScoRead),
526            process_sco_request(&mut sco_request_stream, ZERO_INPUT_SBC_PACKET.to_vec()).await
527        );
528
529        let notifications_per_ring = 20;
530        // Request a 1-second audio buffer. This is guaranteed to be greater than 1 second, since
531        // the driver must reserve any space it needs ON TOP OF the client-requested 16000 bytes.
532        let (frames, _vmo) = ring_buffer
533            .get_vmo(16000, notifications_per_ring)
534            .await
535            .expect("fidl")
536            .expect("response");
537
538        // To be deterministic, we set the first notification before even starting the ring-buffer.
539        let mut position_info = ring_buffer.watch_clock_recovery_position_info();
540        let mut position_notifications = 0;
541
542        let _ = ring_buffer.start().await;
543
544        // For 100 MSBC Audio frames, we get 7.5 x 100 = 750 milliseconds, or 12000 frames.
545        let frames_per_notification = frames / notifications_per_ring;
546        // As noted above, `frames` > 16000, so `frames_per_notification` > 800. Assuming the ring-
547        // buffer is < 17000 frames, `expected_notifications` will be 14.xx (as u32: 14), not 15.
548        let expected_notifications = 12000 / frames_per_notification;
549
550        // We might receive the first notification as early as ring-buffer position 0,
551        // so we check for a notification before processing the first chunk of data.
552        if position_info.poll_unpin(&mut Context::from_waker(&std::task::Waker::noop())).is_ready()
553        {
554            position_notifications += 1;
555            position_info = ring_buffer.watch_clock_recovery_position_info();
556        }
557        for _ in 1..100 {
558            assert_eq!(
559                Some(ProcessedRequest::ScoRead),
560                process_sco_request(&mut sco_request_stream, ZERO_INPUT_SBC_PACKET.to_vec()).await
561            );
562            // We are the only ones polling position_info, so we can ignore wakeups (noop waker).
563            if position_info
564                .poll_unpin(&mut Context::from_waker(&std::task::Waker::noop()))
565                .is_ready()
566            {
567                position_notifications += 1;
568                position_info = ring_buffer.watch_clock_recovery_position_info();
569            }
570        }
571
572        // The audio driver protocol require notification VALUES [timestamp, position] to tightly
573        // correlate. It is less concerned with notification ARRIVAL TIMES; these could occur up to
574        // 1 notification's duration early. Thus, if we expect X notifications, then we allow X+1.
575        assert!(position_notifications >= expected_notifications);
576        assert!(position_notifications <= expected_notifications + 1);
577    }
578
579    #[fuchsia::test]
580    async fn encode_sco_audio_path_msbc() {
581        use fidl_fuchsia_hardware_audio as audio;
582        let (proxy, mut audio_enumerator_requests) =
583            fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
584        let mut control = InbandControl::create(proxy).unwrap();
585
586        let (connection, mut sco_request_stream) =
587            connection_for_codec(PeerId(1), CodecId::MSBC, true);
588
589        control.start(PeerId(1), connection, CodecId::MSBC).expect("should be able to start");
590
591        let audio_output_stream_config;
592        let mut _audio_input_stream_config;
593        loop {
594            match audio_enumerator_requests.next().await {
595                Some(Ok(media::AudioDeviceEnumeratorRequest::AddDeviceByChannel {
596                    is_input,
597                    channel,
598                    ..
599                })) => {
600                    if !is_input {
601                        audio_output_stream_config = channel.into_proxy();
602                        break;
603                    } else {
604                        _audio_input_stream_config = channel.into_proxy();
605                    }
606                }
607                x => panic!("Expected audio device by channel, got {x:?}"),
608            }
609        }
610
611        let (ring_buffer, server) = fidl::endpoints::create_proxy::<audio::RingBufferMarker>();
612        audio_output_stream_config
613            .create_ring_buffer(&CodecId::MSBC.try_into().unwrap(), server)
614            .unwrap();
615
616        // Note: we don't need to read from the stream to start it, it gets polled automatically by
617        // the read task.
618
619        let notifications_per_ring = 20;
620        // Request at least 1 second of audio buffer.
621        let (_frames, _vmo) = ring_buffer
622            .get_vmo(16000, notifications_per_ring)
623            .await
624            .expect("fidl")
625            .expect("response");
626
627        let _ = ring_buffer.start().await;
628
629        // Expect 100 MSBC Audio frames, which should take ~ 750 milliseconds.
630        let next_header = &mut [0x01, 0x08];
631        for _sco_frame in 1..100 {
632            'sco: loop {
633                match process_sco_request(&mut sco_request_stream, ZERO_INPUT_SBC_PACKET.to_vec())
634                    .await
635                {
636                    Some(ProcessedRequest::ScoRead) => continue 'sco,
637                    Some(ProcessedRequest::ScoWrite(data)) => {
638                        assert_eq!(60, data.len());
639                        // Skip the H2 header which changes for every packet.
640                        assert_eq!(&ZERO_INPUT_SBC_PACKET[2..], &data[2..]);
641                        assert_eq!(next_header, &data[0..2]);
642                        // Prep for the next heade
643                        match next_header[1] {
644                            0x08 => next_header[1] = 0x38,
645                            0x38 => next_header[1] = 0xc8,
646                            0xc8 => next_header[1] = 0xf8,
647                            0xf8 => next_header[1] = 0x08,
648                            _ => unreachable!(),
649                        };
650                        break 'sco;
651                    }
652                    x => panic!("Expected read or write but got {x:?}"),
653                };
654            }
655        }
656    }
657
658    #[fuchsia::test]
659    async fn encode_sco_audio_path_cvsd() {
660        use fidl_fuchsia_hardware_audio as audio;
661        let (proxy, mut audio_enumerator_requests) =
662            fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
663        let mut control = InbandControl::create(proxy).unwrap();
664
665        let (connection, mut sco_request_stream) =
666            connection_for_codec(PeerId(1), CodecId::CVSD, true);
667
668        control.start(PeerId(1), connection, CodecId::CVSD).expect("should be able to start");
669
670        let audio_output_stream_config;
671        let mut _audio_input_stream_config;
672        loop {
673            match audio_enumerator_requests.next().await {
674                Some(Ok(media::AudioDeviceEnumeratorRequest::AddDeviceByChannel {
675                    is_input,
676                    channel,
677                    ..
678                })) => {
679                    if !is_input {
680                        audio_output_stream_config = channel.into_proxy();
681                        break;
682                    } else {
683                        _audio_input_stream_config = channel.into_proxy();
684                    }
685                }
686                x => panic!("Expected audio device by channel, got {x:?}"),
687            }
688        }
689
690        let (ring_buffer, server) = fidl::endpoints::create_proxy::<audio::RingBufferMarker>();
691        audio_output_stream_config
692            .create_ring_buffer(&CodecId::CVSD.try_into().unwrap(), server)
693            .unwrap();
694
695        // Note: we don't need to read from the stream to start it, it gets polled automatically by
696        // the read task.
697
698        let notifications_per_ring = 10;
699        // Request at least 1 second of audio buffer.
700        let (_frames, _vmo) = ring_buffer
701            .get_vmo(64000, notifications_per_ring)
702            .await
703            .expect("fidl")
704            .expect("response");
705
706        let _ = ring_buffer.start().await;
707
708        // Expect 100 CVSD Audio frames, which should take ~ 750 milliseconds.
709        for _sco_frame in 1..100 {
710            'sco: loop {
711                match process_sco_request(&mut sco_request_stream, ZERO_INPUT_CVSD_PACKET.to_vec())
712                    .await
713                {
714                    Some(ProcessedRequest::ScoRead) => continue 'sco,
715                    Some(ProcessedRequest::ScoWrite(data)) => {
716                        // Confirm the data is right
717                        assert_eq!(60, data.len());
718                        assert_eq!(&ZERO_INPUT_CVSD_PACKET, data.as_slice());
719                        break 'sco;
720                    }
721                    x => panic!("Expected read or write but got {x:?}"),
722                };
723            }
724        }
725    }
726
727    #[fuchsia::test]
728    async fn read_from_audio_output() {
729        use fidl_fuchsia_hardware_audio as audio;
730        let (proxy, mut audio_enumerator_requests) =
731            fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
732        let mut control = InbandControl::create(proxy).unwrap();
733
734        let (connection, mut sco_request_stream) =
735            connection_for_codec(PeerId(1), CodecId::MSBC, true);
736
737        control.start(PeerId(1), connection, CodecId::MSBC).expect("should be able to start");
738
739        let audio_output_stream_config;
740        let mut _audio_input_stream_config;
741        loop {
742            match audio_enumerator_requests.next().await {
743                Some(Ok(media::AudioDeviceEnumeratorRequest::AddDeviceByChannel {
744                    is_input,
745                    channel,
746                    ..
747                })) => {
748                    if !is_input {
749                        audio_output_stream_config = channel.into_proxy();
750                        break;
751                    } else {
752                        _audio_input_stream_config = channel.into_proxy();
753                    }
754                }
755                x => panic!("Expected audio device by channel, got {x:?}"),
756            }
757        }
758
759        let (ring_buffer, server) = fidl::endpoints::create_proxy::<audio::RingBufferMarker>();
760        audio_output_stream_config
761            .create_ring_buffer(&CodecId::MSBC.try_into().unwrap(), server)
762            .expect("create ring buffer");
763
764        let notifications_per_ring = 20;
765        // Request at least 1 second of audio buffer.
766        let (_frames, _vmo) = ring_buffer
767            .get_vmo(16000, notifications_per_ring)
768            .await
769            .expect("fidl")
770            .expect("response");
771
772        let _ = ring_buffer.start().await;
773
774        // We should be just reading from the audio output, track via position notifications.
775        // 20 position notifications happen in one second.
776        'position_notifications: for i in 1..20 {
777            let mut position_info = ring_buffer.watch_clock_recovery_position_info();
778            loop {
779                let sco_activity = Box::pin(process_sco_request(
780                    &mut sco_request_stream,
781                    ZERO_INPUT_SBC_PACKET.to_vec(),
782                ));
783                use futures::future::Either;
784                match futures::future::select(position_info, sco_activity).await {
785                    Either::Left((result, _sco_fut)) => {
786                        assert!(result.is_ok(), "Position Info failed at {i}");
787                        continue 'position_notifications;
788                    }
789                    Either::Right((_sco_pkt, position_info_fut)) => {
790                        position_info = position_info_fut;
791                    }
792                }
793            }
794        }
795    }
796
797    #[fuchsia::test]
798    async fn audio_output_error_sends_to_events() {
799        let (proxy, mut audio_enumerator_requests) =
800            fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
801        let mut control = InbandControl::create(proxy).unwrap();
802        let mut events = control.take_events();
803
804        let (connection, _sco_request_stream) =
805            connection_for_codec(PeerId(1), CodecId::MSBC, true);
806
807        control.start(PeerId(1), connection, CodecId::MSBC).expect("should be able to start");
808
809        let audio_output_stream_config;
810        let mut _audio_input_stream_config;
811        loop {
812            match audio_enumerator_requests.next().await {
813                Some(Ok(media::AudioDeviceEnumeratorRequest::AddDeviceByChannel {
814                    is_input,
815                    channel,
816                    ..
817                })) => {
818                    if !is_input {
819                        audio_output_stream_config = channel.into_proxy();
820                        break;
821                    } else {
822                        _audio_input_stream_config = channel.into_proxy();
823                    }
824                }
825                x => panic!("Expected audio device by channel, got {x:?}"),
826            }
827        }
828
829        drop(audio_output_stream_config);
830
831        // Events should produce an error because there was an issue with audio output.
832        match events.next().await {
833            Some(ControlEvent::Stopped { id, error: Some(_) }) => {
834                assert_eq!(PeerId(1), id);
835            }
836            x => panic!("Expected the peer to have error stop, but got {x:?}"),
837        };
838    }
839}