bt_hfp/audio/
inband.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::format_err;
6use fuchsia_audio_codec::{StreamProcessor, StreamProcessorOutputStream};
7use fuchsia_audio_device::stream_config::SoftStreamConfig;
8use fuchsia_audio_device::{AudioFrameSink, AudioFrameStream};
9use fuchsia_bluetooth::types::{peer_audio_stream_id, PeerId};
10use fuchsia_sync::Mutex;
11use futures::stream::BoxStream;
12use futures::task::Context;
13use futures::{AsyncWriteExt, FutureExt, StreamExt};
14use log::{error, info, warn};
15use media::AudioDeviceEnumeratorProxy;
16use std::pin::pin;
17use {fidl_fuchsia_bluetooth_bredr as bredr, fidl_fuchsia_media as media, fuchsia_async as fasync};
18
19use crate::audio::{Control, ControlEvent, Error, HF_INPUT_UUID, HF_OUTPUT_UUID};
20use crate::codec_id::CodecId;
21use crate::sco;
22
23/// Audio Control for inband audio, i.e. encoding and decoding audio before sending them
24/// to the controller via HCI (in contrast to offloaded audio).
25pub struct InbandControl {
26    audio_core: media::AudioDeviceEnumeratorProxy,
27    session_task: Option<(PeerId, fasync::Task<()>)>,
28    event_sender: Mutex<futures::channel::mpsc::Sender<ControlEvent>>,
29    stream: Mutex<Option<futures::channel::mpsc::Receiver<ControlEvent>>>,
30}
31
32// Setup for a running AudioSession.
33// AudioSesison::run() consumes the session and should handle the data path in both directions:
34//   - SCO -> decoder -> audio_core input (audio_frame_sink)
35//   - audio_core output -> encoder -> SCO
36struct AudioSession {
37    audio_frame_sink: AudioFrameSink,
38    audio_frame_stream: AudioFrameStream,
39    sco: sco::Connection,
40    codec: CodecId,
41    decoder: StreamProcessor,
42    encoder: StreamProcessor,
43    event_sender: futures::channel::mpsc::Sender<ControlEvent>,
44}
45
46impl AudioSession {
47    fn setup(
48        connection: sco::Connection,
49        codec: CodecId,
50        audio_frame_sink: AudioFrameSink,
51        audio_frame_stream: AudioFrameStream,
52        event_sender: futures::channel::mpsc::Sender<ControlEvent>,
53    ) -> Result<Self, Error> {
54        if !codec.is_supported() {
55            return Err(Error::UnsupportedParameters {
56                source: format_err!("unsupported codec {codec}"),
57            });
58        }
59        let decoder = StreamProcessor::create_decoder(codec.mime_type()?, Some(codec.oob_bytes()))
60            .map_err(|e| Error::audio_core(format_err!("creating decoder: {e:?}")))?;
61        let encoder = StreamProcessor::create_encoder(codec.try_into()?, codec.try_into()?)
62            .map_err(|e| Error::audio_core(format_err!("creating encoder: {e:?}")))?;
63        Ok(Self {
64            sco: connection,
65            decoder,
66            encoder,
67            audio_frame_sink,
68            audio_frame_stream,
69            codec,
70            event_sender,
71        })
72    }
73
74    async fn encoder_to_sco(
75        mut encoded_stream: StreamProcessorOutputStream,
76        proxy: bredr::ScoConnectionProxy,
77        codec: CodecId,
78    ) -> Error {
79        // Pre-allocate the packet vector and reuse to avoid allocating for every packet.
80        let packet: Vec<u8> = vec![0; 60]; // SCO has 60 byte packets
81        let mut request =
82            bredr::ScoConnectionWriteRequest { data: Some(packet), ..Default::default() };
83
84        const MSBC_ENCODED_LEN: usize = 57; // Length of a MSBC packet after encoding.
85        if codec == CodecId::MSBC {
86            let packet: &mut [u8] = request.data.as_mut().unwrap().as_mut_slice();
87            packet[0] = 0x01; // H2 header has a constant part (0b1000_0000_0001_AABB) with AABB
88                              // cycling 0000, 0011, 1100, 1111
89        }
90        // The H2 Header marker cycle, with the constant part
91        let mut h2_marker = [0x08u8, 0x38, 0xc8, 0xf8].iter().cycle();
92        loop {
93            match encoded_stream.next().await {
94                Some(Ok(encoded)) => {
95                    if codec == CodecId::MSBC {
96                        if encoded.len() % MSBC_ENCODED_LEN != 0 {
97                            warn!("Got {} bytes, uneven number of packets", encoded.len());
98                        }
99                        for sbc_packet in encoded.as_slice().chunks_exact(MSBC_ENCODED_LEN) {
100                            let packet: &mut [u8] = request.data.as_mut().unwrap().as_mut_slice();
101                            packet[1] = *h2_marker.next().unwrap();
102                            packet[2..59].copy_from_slice(sbc_packet);
103                            if let Err(e) = proxy.write(&request).await {
104                                return e.into();
105                            }
106                        }
107                    } else {
108                        // CVSD has no padding or header. Encoder sends us multiples of 60 bytes as
109                        // long as we provide a multiple of 7.5ms audio packets.
110                        for cvsd_packet in encoded.as_slice().chunks_exact(60) {
111                            let packet: &mut [u8] = request.data.as_mut().unwrap().as_mut_slice();
112                            packet.copy_from_slice(cvsd_packet);
113                            if let Err(e) = proxy.write(&request).await {
114                                return e.into();
115                            }
116                        }
117                    }
118                }
119                Some(Err(e)) => {
120                    warn!("Error in encoding: {e:?}");
121                    return Error::audio_core(format_err!("Couldn't read encoded: {e:?}"));
122                }
123                None => {
124                    warn!("Error in encoding: Stream is ended!");
125                    return Error::audio_core(format_err!("Encoder stream ended early"));
126                }
127            }
128        }
129    }
130
131    async fn pcm_to_encoder(mut encoder: StreamProcessor, mut stream: AudioFrameStream) -> Error {
132        loop {
133            match stream.next().await {
134                Some(Ok(pcm)) => {
135                    if let Err(e) = encoder.write_all(pcm.as_slice()).await {
136                        return Error::audio_core(format_err!("write to encoder: {e:?}"));
137                    }
138                    // Packets should be exactly the right size.
139                    if let Err(e) = encoder.flush().await {
140                        return Error::audio_core(format_err!("flush encoder: {e:?}"));
141                    }
142                }
143                Some(Err(e)) => {
144                    warn!("Audio output error: {e:?}");
145                    return Error::audio_core(format_err!("output error: {e:?}"));
146                }
147                None => {
148                    warn!("Ran out of audio input!");
149                    return Error::audio_core(format_err!("Audio input end"));
150                }
151            }
152        }
153    }
154
155    async fn decoder_to_pcm(
156        mut decoded_stream: StreamProcessorOutputStream,
157        mut sink: AudioFrameSink,
158    ) -> Error {
159        let mut decoded_packets = 0;
160        loop {
161            match decoded_stream.next().await {
162                Some(Ok(decoded)) => {
163                    decoded_packets += 1;
164                    if decoded_packets % 500 == 0 {
165                        info!(
166                            "Got {} decoded bytes from decoder: {decoded_packets} packets",
167                            decoded.len()
168                        );
169                    }
170                    if let Err(e) = sink.write_all(decoded.as_slice()).await {
171                        warn!("Error sending to sink: {e:?}");
172                        return Error::audio_core(format_err!("send to sink: {e:?}"));
173                    }
174                }
175                Some(Err(e)) => {
176                    warn!("Error in decoding: {e:?}");
177                    return Error::audio_core(format_err!("Couldn't read decoder: {e:?}"));
178                }
179                None => {
180                    warn!("Error in decoding: Stream is ended!");
181                    return Error::audio_core(format_err!("Decoder stream ended early"));
182                }
183            }
184        }
185    }
186
187    async fn sco_to_decoder(
188        proxy: bredr::ScoConnectionProxy,
189        mut decoder: StreamProcessor,
190        codec: CodecId,
191    ) -> Error {
192        loop {
193            let data = match proxy.read().await {
194                Ok(bredr::ScoConnectionReadResponse { data: Some(data), .. }) => data,
195                Ok(_) => return Error::audio_core(format_err!("Invalid Read response")),
196                Err(e) => return e.into(),
197            };
198            let packet = match codec {
199                CodecId::CVSD => data.as_slice(),
200                CodecId::MSBC => {
201                    // H2 Header (two octets) is present on packets when WBS is used
202                    let (_header, packet) = data.as_slice().split_at(2);
203                    if packet[0] != 0xad {
204                        info!(
205                            "Packet didn't start with syncword: {:#02x} {}",
206                            packet[0],
207                            packet.len()
208                        );
209                    }
210                    packet
211                }
212                _ => {
213                    return Error::UnsupportedParameters {
214                        source: format_err!("Unknown CodecId: {codec:?}"),
215                    }
216                }
217            };
218            if let Err(e) = decoder.write_all(packet).await {
219                return Error::audio_core(format_err!("Failed to write to decoder: {e:?}"));
220            }
221            // TODO(https://fxbug.dev/42073275): buffer some packets before flushing instead of flushing on
222            // every one.
223            if let Err(e) = decoder.flush().await {
224                return Error::audio_core(format_err!("Failed to flush decoder: {e:?}"));
225            }
226        }
227    }
228
229    async fn run(mut self) {
230        let peer_id = self.sco.peer_id;
231        let Ok(encoded_stream) = self.encoder.take_output_stream() else {
232            error!("Couldn't take encoder output stream");
233            return;
234        };
235        let sco_write =
236            AudioSession::encoder_to_sco(encoded_stream, self.sco.proxy.clone(), self.codec);
237        let sco_write = pin!(sco_write);
238        let audio_to_encoder = AudioSession::pcm_to_encoder(self.encoder, self.audio_frame_stream);
239        let audio_to_encoder = pin!(audio_to_encoder);
240
241        let Ok(decoded_stream) = self.decoder.take_output_stream() else {
242            error!("Couldn't take decoder output stream");
243            return;
244        };
245        let decoder_to_sink =
246            pin!(AudioSession::decoder_to_pcm(decoded_stream, self.audio_frame_sink));
247        let sco_read =
248            AudioSession::sco_to_decoder(self.sco.proxy.clone(), self.decoder, self.codec);
249        let sco_read = pin!(sco_read);
250        let e = futures::select! {
251            e = audio_to_encoder.fuse() => { warn!(e:?; "PCM to encoder write"); e},
252            e = sco_write.fuse() => { warn!(e:?; "Write encoded to SCO"); e},
253            e = sco_read.fuse() => { warn!(e:?; "SCO read to decoder"); e},
254            e = decoder_to_sink.fuse() => { warn!(e:?; "SCO decoder to PCM"); e},
255        };
256        let _ = self.event_sender.try_send(ControlEvent::Stopped { id: peer_id, error: Some(e) });
257    }
258
259    fn start(self) -> fasync::Task<()> {
260        fasync::Task::spawn(self.run())
261    }
262}
263
264impl InbandControl {
265    pub fn create(proxy: AudioDeviceEnumeratorProxy) -> Result<Self, Error> {
266        let (sender, receiver) = futures::channel::mpsc::channel(1);
267        Ok(Self {
268            audio_core: proxy,
269            session_task: None,
270            event_sender: Mutex::new(sender),
271            stream: Mutex::new(Some(receiver)),
272        })
273    }
274
275    fn running_id(&mut self) -> Option<PeerId> {
276        self.session_task
277            .as_mut()
278            .and_then(|(running, task)| {
279                let mut cx = Context::from_waker(futures::task::noop_waker_ref());
280                // We are the only thing that polls this task, so we are ok to poll it and throw away a
281                // wake.
282                task.poll_unpin(&mut cx).is_pending().then_some(running)
283            })
284            .copied()
285    }
286
287    const LOCAL_MONOTONIC_CLOCK_DOMAIN: u32 = 0;
288
289    // This is currently 2x an SCO frame which holds 7.5ms
290    // This must be a multiple of 7.5ms for the CVSD encoder to not have any remainder bytes.
291    const AUDIO_BUFFER_DURATION: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(15);
292
293    fn start_input(&mut self, peer_id: PeerId, codec_id: CodecId) -> Result<AudioFrameSink, Error> {
294        let audio_dev_id = peer_audio_stream_id(peer_id, HF_INPUT_UUID);
295        let (client, sink) = SoftStreamConfig::create_input(
296            &audio_dev_id,
297            "Fuchsia",
298            super::DEVICE_NAME,
299            Self::LOCAL_MONOTONIC_CLOCK_DOMAIN,
300            codec_id.try_into()?,
301            Self::AUDIO_BUFFER_DURATION,
302        )
303        .map_err(|e| Error::audio_core(format_err!("Couldn't create input: {e:?}")))?;
304
305        self.audio_core.add_device_by_channel(super::DEVICE_NAME, true, client)?;
306        Ok(sink)
307    }
308
309    fn start_output(
310        &mut self,
311        peer_id: PeerId,
312        codec_id: CodecId,
313    ) -> Result<AudioFrameStream, Error> {
314        let audio_dev_id = peer_audio_stream_id(peer_id, HF_OUTPUT_UUID);
315        let (client, stream) = SoftStreamConfig::create_output(
316            &audio_dev_id,
317            "Fuchsia",
318            super::DEVICE_NAME,
319            Self::LOCAL_MONOTONIC_CLOCK_DOMAIN,
320            codec_id.try_into()?,
321            Self::AUDIO_BUFFER_DURATION,
322            zx::MonotonicDuration::from_millis(0),
323        )
324        .map_err(|e| Error::audio_core(format_err!("Couldn't create output: {e:?}")))?;
325        self.audio_core.add_device_by_channel(super::DEVICE_NAME, false, client)?;
326        Ok(stream)
327    }
328}
329
330impl Control for InbandControl {
331    fn start(
332        &mut self,
333        id: PeerId,
334        connection: sco::Connection,
335        codec: CodecId,
336    ) -> Result<(), Error> {
337        if let Some(running) = self.running_id() {
338            if running == id {
339                return Err(Error::AlreadyStarted);
340            }
341            return Err(Error::UnsupportedParameters {
342                source: format_err!("Only one peer can be started inband at once"),
343            });
344        }
345        let frame_sink = self.start_input(id, codec)?;
346        let frame_stream = self.start_output(id, codec)?;
347        let session = AudioSession::setup(
348            connection,
349            codec,
350            frame_sink,
351            frame_stream,
352            self.event_sender.lock().clone(),
353        )?;
354        self.session_task = Some((id, session.start()));
355        Ok(())
356    }
357
358    fn stop(&mut self, id: PeerId) -> Result<(), Error> {
359        if self.running_id() != Some(id) {
360            return Err(Error::NotStarted);
361        }
362        self.session_task = None;
363        let _ = self.event_sender.get_mut().try_send(ControlEvent::Stopped { id, error: None });
364        Ok(())
365    }
366
367    fn connect(&mut self, _id: PeerId, _supported_codecs: &[CodecId]) {
368        // Nothing to do here
369    }
370
371    fn disconnect(&mut self, id: PeerId) {
372        let _ = self.stop(id);
373    }
374
375    fn take_events(&self) -> BoxStream<'static, ControlEvent> {
376        self.stream.lock().take().unwrap().boxed()
377    }
378
379    fn failed_request(&self, _request: ControlEvent, _error: Error) {
380        // We send no requests, so ignore this.
381    }
382}
383
384#[cfg(test)]
385mod tests {
386    use super::*;
387
388    use fidl_fuchsia_bluetooth_bredr::ScoConnectionRequestStream;
389
390    use crate::sco::test_utils::connection_for_codec;
391
392    /// A "Zero input response" SBC packet.  This is what SBC encodes to (with the MSBC settings)
393    /// when passed a flat input at zero.  Each packet represents 7.5 milliseconds of audio.
394    const ZERO_INPUT_SBC_PACKET: [u8; 60] = [
395        0x80, 0x10, 0xad, 0x00, 0x00, 0xc5, 0x00, 0x00, 0x00, 0x00, 0x77, 0x6d, 0xb6, 0xdd, 0xdb,
396        0x6d, 0xb7, 0x76, 0xdb, 0x6d, 0xdd, 0xb6, 0xdb, 0x77, 0x6d, 0xb6, 0xdd, 0xdb, 0x6d, 0xb7,
397        0x76, 0xdb, 0x6d, 0xdd, 0xb6, 0xdb, 0x77, 0x6d, 0xb6, 0xdd, 0xdb, 0x6d, 0xb7, 0x76, 0xdb,
398        0x6d, 0xdd, 0xb6, 0xdb, 0x77, 0x6d, 0xb6, 0xdd, 0xdb, 0x6d, 0xb7, 0x76, 0xdb, 0x6c, 0x00,
399    ];
400
401    /// A "zero input response" CVSD packet.
402    const ZERO_INPUT_CVSD_PACKET: [u8; 60] = [0x55; 60];
403
404    #[derive(PartialEq, Debug)]
405    enum ProcessedRequest {
406        ScoRead,
407        ScoWrite(Vec<u8>),
408    }
409
410    // Processes one sco request.  Returns true if the stream was ended.
411    async fn process_sco_request(
412        sco_request_stream: &mut ScoConnectionRequestStream,
413        read_data: Vec<u8>,
414    ) -> Option<ProcessedRequest> {
415        match sco_request_stream.next().await {
416            Some(Ok(bredr::ScoConnectionRequest::Read { responder })) => {
417                let response = bredr::ScoConnectionReadResponse {
418                    status_flag: Some(bredr::RxPacketStatus::CorrectlyReceivedData),
419                    data: Some(read_data),
420                    ..Default::default()
421                };
422                responder.send(&response).expect("sends okay");
423                Some(ProcessedRequest::ScoRead)
424            }
425            Some(Ok(bredr::ScoConnectionRequest::Write { payload, responder })) => {
426                responder.send().expect("response to write");
427                Some(ProcessedRequest::ScoWrite(payload.data.unwrap()))
428            }
429            None => None,
430            x => panic!("Expected read or write requests, got {x:?}"),
431        }
432    }
433
434    #[fuchsia::test]
435    async fn reads_audio_from_connection() {
436        let (proxy, _audio_enumerator_requests) =
437            fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
438        let mut control = InbandControl::create(proxy).unwrap();
439
440        let (connection, mut sco_request_stream) =
441            connection_for_codec(PeerId(1), CodecId::MSBC, true);
442
443        control.start(PeerId(1), connection, CodecId::MSBC).expect("should be able to start");
444
445        let (connection2, _request_stream) = connection_for_codec(PeerId(1), CodecId::MSBC, true);
446        let _ = control
447            .start(PeerId(1), connection2, CodecId::MSBC)
448            .expect_err("Starting twice shouldn't be allowed");
449
450        // Test note: 10 packets is not enough to force a write to audio, which will stall this test if
451        // it's not started.
452        for _ in 1..10 {
453            assert_eq!(
454                Some(ProcessedRequest::ScoRead),
455                process_sco_request(&mut sco_request_stream, ZERO_INPUT_SBC_PACKET.to_vec()).await
456            );
457        }
458
459        control.stop(PeerId(1)).expect("should be able to stop");
460        let _ = control.stop(PeerId(1)).expect_err("can't stop a stopped thing");
461
462        // Should be able to drain the requests.
463        let mut extra_requests = 0;
464        while let Some(r) =
465            process_sco_request(&mut sco_request_stream, ZERO_INPUT_SBC_PACKET.to_vec()).await
466        {
467            assert_eq!(ProcessedRequest::ScoRead, r);
468            extra_requests += 1;
469        }
470
471        info!("Got {extra_requests} extra ScoConnectionProxy Requests after stop");
472    }
473
474    #[fuchsia::test]
475    async fn audio_setup_error_bad_codec() {
476        let (proxy, _) =
477            fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
478        let mut control = InbandControl::create(proxy).unwrap();
479
480        let (connection, _sco_request_stream) =
481            connection_for_codec(PeerId(1), CodecId::MSBC, true);
482        let res = control.start(PeerId(1), connection, 0xD0u8.into());
483        assert!(res.is_err());
484    }
485
486    #[fuchsia::test]
487    async fn decode_sco_audio_path() {
488        use fidl_fuchsia_hardware_audio as audio;
489        let (proxy, mut audio_enumerator_requests) =
490            fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
491        let mut control = InbandControl::create(proxy).unwrap();
492
493        let (connection, mut sco_request_stream) =
494            connection_for_codec(PeerId(1), CodecId::MSBC, true);
495
496        control.start(PeerId(1), connection, CodecId::MSBC).expect("should be able to start");
497
498        let audio_input_stream_config;
499        let mut _audio_output_stream_config;
500        loop {
501            match audio_enumerator_requests.next().await {
502                Some(Ok(media::AudioDeviceEnumeratorRequest::AddDeviceByChannel {
503                    is_input,
504                    channel,
505                    ..
506                })) => {
507                    if is_input {
508                        audio_input_stream_config = channel.into_proxy();
509                        break;
510                    } else {
511                        _audio_output_stream_config = channel.into_proxy();
512                    }
513                }
514                x => panic!("Expected audio device by channel, got {x:?}"),
515            }
516        }
517
518        let (ring_buffer, server) = fidl::endpoints::create_proxy::<audio::RingBufferMarker>();
519        audio_input_stream_config
520            .create_ring_buffer(&CodecId::MSBC.try_into().unwrap(), server)
521            .expect("create ring buffer");
522
523        // We need to write to the stream at least once to start it up.
524        assert_eq!(
525            Some(ProcessedRequest::ScoRead),
526            process_sco_request(&mut sco_request_stream, ZERO_INPUT_SBC_PACKET.to_vec()).await
527        );
528
529        let notifications_per_ring = 20;
530        // Request a 1-second audio buffer. This is guaranteed to be greater than 1 second, since
531        // the driver must reserve any space it needs ON TOP OF the client-requested 16000 bytes.
532        let (frames, _vmo) = ring_buffer
533            .get_vmo(16000, notifications_per_ring)
534            .await
535            .expect("fidl")
536            .expect("response");
537
538        // To be deterministic, we set the first notification before even starting the ring-buffer.
539        let mut position_info = ring_buffer.watch_clock_recovery_position_info();
540        let mut position_notifications = 0;
541
542        let _ = ring_buffer.start().await;
543
544        // For 100 MSBC Audio frames, we get 7.5 x 100 = 750 milliseconds, or 12000 frames.
545        let frames_per_notification = frames / notifications_per_ring;
546        // As noted above, `frames` > 16000, so `frames_per_notification` > 800. Assuming the ring-
547        // buffer is < 17000 frames, `expected_notifications` will be 14.xx (as u32: 14), not 15.
548        let expected_notifications = 12000 / frames_per_notification;
549
550        // We might receive the first notification as early as ring-buffer position 0,
551        // so we check for a notification before processing the first chunk of data.
552        if position_info
553            .poll_unpin(&mut Context::from_waker(futures::task::noop_waker_ref()))
554            .is_ready()
555        {
556            position_notifications += 1;
557            position_info = ring_buffer.watch_clock_recovery_position_info();
558        }
559        for _ in 1..100 {
560            assert_eq!(
561                Some(ProcessedRequest::ScoRead),
562                process_sco_request(&mut sco_request_stream, ZERO_INPUT_SBC_PACKET.to_vec()).await
563            );
564            // We are the only ones polling position_info, so we can ignore wakeups (noop waker).
565            if position_info
566                .poll_unpin(&mut Context::from_waker(futures::task::noop_waker_ref()))
567                .is_ready()
568            {
569                position_notifications += 1;
570                position_info = ring_buffer.watch_clock_recovery_position_info();
571            }
572        }
573
574        // The audio driver protocol require notification VALUES [timestamp, position] to tightly
575        // correlate. It is less concerned with notification ARRIVAL TIMES; these could occur up to
576        // 1 notification's duration early. Thus, if we expect X notifications, then we allow X+1.
577        assert!(position_notifications >= expected_notifications);
578        assert!(position_notifications <= expected_notifications + 1);
579    }
580
581    #[fuchsia::test]
582    async fn encode_sco_audio_path_msbc() {
583        use fidl_fuchsia_hardware_audio as audio;
584        let (proxy, mut audio_enumerator_requests) =
585            fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
586        let mut control = InbandControl::create(proxy).unwrap();
587
588        let (connection, mut sco_request_stream) =
589            connection_for_codec(PeerId(1), CodecId::MSBC, true);
590
591        control.start(PeerId(1), connection, CodecId::MSBC).expect("should be able to start");
592
593        let audio_output_stream_config;
594        let mut _audio_input_stream_config;
595        loop {
596            match audio_enumerator_requests.next().await {
597                Some(Ok(media::AudioDeviceEnumeratorRequest::AddDeviceByChannel {
598                    is_input,
599                    channel,
600                    ..
601                })) => {
602                    if !is_input {
603                        audio_output_stream_config = channel.into_proxy();
604                        break;
605                    } else {
606                        _audio_input_stream_config = channel.into_proxy();
607                    }
608                }
609                x => panic!("Expected audio device by channel, got {x:?}"),
610            }
611        }
612
613        let (ring_buffer, server) = fidl::endpoints::create_proxy::<audio::RingBufferMarker>();
614        audio_output_stream_config
615            .create_ring_buffer(&CodecId::MSBC.try_into().unwrap(), server)
616            .unwrap();
617
618        // Note: we don't need to read from the stream to start it, it gets polled automatically by
619        // the read task.
620
621        let notifications_per_ring = 20;
622        // Request at least 1 second of audio buffer.
623        let (_frames, _vmo) = ring_buffer
624            .get_vmo(16000, notifications_per_ring)
625            .await
626            .expect("fidl")
627            .expect("response");
628
629        let _ = ring_buffer.start().await;
630
631        // Expect 100 MSBC Audio frames, which should take ~ 750 milliseconds.
632        let next_header = &mut [0x01, 0x08];
633        for _sco_frame in 1..100 {
634            'sco: loop {
635                match process_sco_request(&mut sco_request_stream, ZERO_INPUT_SBC_PACKET.to_vec())
636                    .await
637                {
638                    Some(ProcessedRequest::ScoRead) => continue 'sco,
639                    Some(ProcessedRequest::ScoWrite(data)) => {
640                        assert_eq!(60, data.len());
641                        // Skip the H2 header which changes for every packet.
642                        assert_eq!(&ZERO_INPUT_SBC_PACKET[2..], &data[2..]);
643                        assert_eq!(next_header, &data[0..2]);
644                        // Prep for the next heade
645                        match next_header[1] {
646                            0x08 => next_header[1] = 0x38,
647                            0x38 => next_header[1] = 0xc8,
648                            0xc8 => next_header[1] = 0xf8,
649                            0xf8 => next_header[1] = 0x08,
650                            _ => unreachable!(),
651                        };
652                        break 'sco;
653                    }
654                    x => panic!("Expected read or write but got {x:?}"),
655                };
656            }
657        }
658    }
659
660    #[fuchsia::test]
661    async fn encode_sco_audio_path_cvsd() {
662        use fidl_fuchsia_hardware_audio as audio;
663        let (proxy, mut audio_enumerator_requests) =
664            fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
665        let mut control = InbandControl::create(proxy).unwrap();
666
667        let (connection, mut sco_request_stream) =
668            connection_for_codec(PeerId(1), CodecId::CVSD, true);
669
670        control.start(PeerId(1), connection, CodecId::CVSD).expect("should be able to start");
671
672        let audio_output_stream_config;
673        let mut _audio_input_stream_config;
674        loop {
675            match audio_enumerator_requests.next().await {
676                Some(Ok(media::AudioDeviceEnumeratorRequest::AddDeviceByChannel {
677                    is_input,
678                    channel,
679                    ..
680                })) => {
681                    if !is_input {
682                        audio_output_stream_config = channel.into_proxy();
683                        break;
684                    } else {
685                        _audio_input_stream_config = channel.into_proxy();
686                    }
687                }
688                x => panic!("Expected audio device by channel, got {x:?}"),
689            }
690        }
691
692        let (ring_buffer, server) = fidl::endpoints::create_proxy::<audio::RingBufferMarker>();
693        audio_output_stream_config
694            .create_ring_buffer(&CodecId::CVSD.try_into().unwrap(), server)
695            .unwrap();
696
697        // Note: we don't need to read from the stream to start it, it gets polled automatically by
698        // the read task.
699
700        let notifications_per_ring = 10;
701        // Request at least 1 second of audio buffer.
702        let (_frames, _vmo) = ring_buffer
703            .get_vmo(64000, notifications_per_ring)
704            .await
705            .expect("fidl")
706            .expect("response");
707
708        let _ = ring_buffer.start().await;
709
710        // Expect 100 CVSD Audio frames, which should take ~ 750 milliseconds.
711        for _sco_frame in 1..100 {
712            'sco: loop {
713                match process_sco_request(&mut sco_request_stream, ZERO_INPUT_CVSD_PACKET.to_vec())
714                    .await
715                {
716                    Some(ProcessedRequest::ScoRead) => continue 'sco,
717                    Some(ProcessedRequest::ScoWrite(data)) => {
718                        // Confirm the data is right
719                        assert_eq!(60, data.len());
720                        assert_eq!(&ZERO_INPUT_CVSD_PACKET, data.as_slice());
721                        break 'sco;
722                    }
723                    x => panic!("Expected read or write but got {x:?}"),
724                };
725            }
726        }
727    }
728
729    #[fuchsia::test]
730    async fn read_from_audio_output() {
731        use fidl_fuchsia_hardware_audio as audio;
732        let (proxy, mut audio_enumerator_requests) =
733            fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
734        let mut control = InbandControl::create(proxy).unwrap();
735
736        let (connection, mut sco_request_stream) =
737            connection_for_codec(PeerId(1), CodecId::MSBC, true);
738
739        control.start(PeerId(1), connection, CodecId::MSBC).expect("should be able to start");
740
741        let audio_output_stream_config;
742        let mut _audio_input_stream_config;
743        loop {
744            match audio_enumerator_requests.next().await {
745                Some(Ok(media::AudioDeviceEnumeratorRequest::AddDeviceByChannel {
746                    is_input,
747                    channel,
748                    ..
749                })) => {
750                    if !is_input {
751                        audio_output_stream_config = channel.into_proxy();
752                        break;
753                    } else {
754                        _audio_input_stream_config = channel.into_proxy();
755                    }
756                }
757                x => panic!("Expected audio device by channel, got {x:?}"),
758            }
759        }
760
761        let (ring_buffer, server) = fidl::endpoints::create_proxy::<audio::RingBufferMarker>();
762        audio_output_stream_config
763            .create_ring_buffer(&CodecId::MSBC.try_into().unwrap(), server)
764            .expect("create ring buffer");
765
766        let notifications_per_ring = 20;
767        // Request at least 1 second of audio buffer.
768        let (_frames, _vmo) = ring_buffer
769            .get_vmo(16000, notifications_per_ring)
770            .await
771            .expect("fidl")
772            .expect("response");
773
774        let _ = ring_buffer.start().await;
775
776        // We should be just reading from the audio output, track via position notifications.
777        // 20 position notifications happen in one second.
778        'position_notifications: for i in 1..20 {
779            let mut position_info = ring_buffer.watch_clock_recovery_position_info();
780            loop {
781                let sco_activity = Box::pin(process_sco_request(
782                    &mut sco_request_stream,
783                    ZERO_INPUT_SBC_PACKET.to_vec(),
784                ));
785                use futures::future::Either;
786                match futures::future::select(position_info, sco_activity).await {
787                    Either::Left((result, _sco_fut)) => {
788                        assert!(result.is_ok(), "Position Info failed at {i}");
789                        continue 'position_notifications;
790                    }
791                    Either::Right((_sco_pkt, position_info_fut)) => {
792                        position_info = position_info_fut;
793                    }
794                }
795            }
796        }
797    }
798
799    #[fuchsia::test]
800    async fn audio_output_error_sends_to_events() {
801        let (proxy, mut audio_enumerator_requests) =
802            fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
803        let mut control = InbandControl::create(proxy).unwrap();
804        let mut events = control.take_events();
805
806        let (connection, _sco_request_stream) =
807            connection_for_codec(PeerId(1), CodecId::MSBC, true);
808
809        control.start(PeerId(1), connection, CodecId::MSBC).expect("should be able to start");
810
811        let audio_output_stream_config;
812        let mut _audio_input_stream_config;
813        loop {
814            match audio_enumerator_requests.next().await {
815                Some(Ok(media::AudioDeviceEnumeratorRequest::AddDeviceByChannel {
816                    is_input,
817                    channel,
818                    ..
819                })) => {
820                    if !is_input {
821                        audio_output_stream_config = channel.into_proxy();
822                        break;
823                    } else {
824                        _audio_input_stream_config = channel.into_proxy();
825                    }
826                }
827                x => panic!("Expected audio device by channel, got {x:?}"),
828            }
829        }
830
831        drop(audio_output_stream_config);
832
833        // Events should produce an error because there was an issue with audio output.
834        match events.next().await {
835            Some(ControlEvent::Stopped { id, error: Some(_) }) => {
836                assert_eq!(PeerId(1), id);
837            }
838            x => panic!("Expected the peer to have error stop, but got {x:?}"),
839        };
840    }
841}