bt_hfp/audio/
dai.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::format_err;
6use fidl_fuchsia_hardware_audio::{self as audio, DaiFormat, PcmFormat};
7use fidl_fuchsia_media as media;
8use fuchsia_audio_dai::{self as dai, DaiAudioDevice, DigitalAudioInterface};
9use fuchsia_bluetooth::types::{peer_audio_stream_id, PeerId};
10use fuchsia_sync::Mutex;
11use futures::stream::BoxStream;
12use futures::StreamExt;
13use log::{info, warn};
14use media::AudioDeviceEnumeratorProxy;
15
16use crate::audio::{HF_INPUT_UUID, HF_OUTPUT_UUID};
17use crate::codec_id::CodecId;
18use crate::sco;
19
20use super::{Control, ControlEvent, Error};
21
22/// Control that connects the Audio start / stop to a Digital Audio Interface
23/// device, which is added to an AudioCore configured Fuchsia Audio system.  The
24/// DAI driver provides the ring buffer and is connected to the controller,
25/// providing audio without encoding in-band.  This Control cannot start audio
26/// without a call in progress.
27pub struct DaiControl {
28    output: DaiAudioDevice,
29    input: DaiAudioDevice,
30    audio_core: media::AudioDeviceEnumeratorProxy,
31    active_connection: Option<sco::Connection>,
32    sender: Mutex<futures::channel::mpsc::Sender<ControlEvent>>,
33}
34
35impl DaiControl {
36    pub async fn discover(proxy: AudioDeviceEnumeratorProxy) -> Result<Self, Error> {
37        let devices = dai::find_devices().await.or(Err(Error::DiscoveryFailed))?;
38        Self::setup(devices, proxy).await
39    }
40
41    // Public for tests within the crate
42    pub(crate) async fn setup(
43        devices: Vec<DigitalAudioInterface>,
44        proxy: media::AudioDeviceEnumeratorProxy,
45    ) -> Result<Self, Error> {
46        let mut input_dai = None;
47        let mut output_dai = None;
48
49        for mut device in devices {
50            if device.connect().is_err() {
51                continue;
52            }
53            let props = match device.properties().await {
54                Err(e) => {
55                    warn!("Couldn't find properties for device: {:?}", e);
56                    continue;
57                }
58                Ok(props) => props,
59            };
60            let dai_input = props.is_input.ok_or(Error::DiscoveryFailed)?;
61            if input_dai.is_none() && dai_input {
62                let dai_device = DaiAudioDevice::build(device).await.map_err(|e| {
63                    warn!("Couldn't build a DAI input audio device: {e:?}");
64                    Error::DiscoveryFailed
65                })?;
66                input_dai = Some(dai_device);
67            } else if output_dai.is_none() && !dai_input {
68                let dai_device = DaiAudioDevice::build(device).await.map_err(|e| {
69                    warn!("Couldn't build a DAI output audio device: {e:?}");
70                    Error::DiscoveryFailed
71                })?;
72                output_dai = Some(dai_device);
73            }
74
75            if input_dai.is_some() && output_dai.is_some() {
76                return Ok(Self::build(proxy, input_dai.unwrap(), output_dai.unwrap()));
77            }
78        }
79
80        info!("Couldn't find the correct combination of DAI devices");
81        Err(Error::DiscoveryFailed)
82    }
83
84    pub fn build(
85        audio_core: media::AudioDeviceEnumeratorProxy,
86        input: DaiAudioDevice,
87        output: DaiAudioDevice,
88    ) -> Self {
89        // Sender will be overwritten when the stream is taken.
90        let (sender, _) = futures::channel::mpsc::channel(0);
91        Self { input, output, audio_core, active_connection: None, sender: Mutex::new(sender) }
92    }
93
94    fn start_device(
95        &mut self,
96        peer_id: &PeerId,
97        input: bool,
98        pcm_format: PcmFormat,
99    ) -> Result<(), anyhow::Error> {
100        let (uuid, dev) = if input {
101            (HF_INPUT_UUID, &mut self.input)
102        } else {
103            (HF_OUTPUT_UUID, &mut self.output)
104        };
105        // TODO(b/358666067): make dai-device choose this format
106        // Use the first supported format for now.
107        let missing_err = format_err!("Couldn't find DAI frame format");
108        let Some(supported) = dev.dai_formats().first() else {
109            return Err(missing_err);
110        };
111        let Some(frame_format) = supported.frame_formats.first().cloned() else {
112            return Err(missing_err);
113        };
114        let dai_format = DaiFormat {
115            number_of_channels: 1,
116            channels_to_use_bitmask: 0x1,
117            sample_format: audio::DaiSampleFormat::PcmSigned,
118            frame_format,
119            frame_rate: pcm_format.frame_rate,
120            bits_per_slot: 16,
121            bits_per_sample: 16,
122        };
123        let dev_id = peer_audio_stream_id(*peer_id, uuid);
124        dev.config(dai_format, pcm_format)?;
125        dev.start(self.audio_core.clone(), "HFP Audio", dev_id, "Fuchsia", "Sapphire HFP Headset")
126    }
127}
128
129impl Control for DaiControl {
130    fn start(
131        &mut self,
132        id: PeerId,
133        connection: sco::Connection,
134        _codec: CodecId,
135    ) -> Result<(), Error> {
136        if self.active_connection.is_some() {
137            // Can only handle one connection at once
138            return Err(Error::AlreadyStarted);
139        }
140        // I/O bandwidth is matched to frame rate but includes both source and sink, so the
141        // audio frame rate is half that.
142        let frame_rate = match connection.params.io_bandwidth {
143            32000 => 16000,
144            16000 => 8000,
145            _ => {
146                return Err(Error::UnsupportedParameters {
147                    source: format_err!("Unsupported frame_rate"),
148                })
149            }
150        };
151        let pcm_format = PcmFormat {
152            number_of_channels: 1,
153            sample_format: audio::SampleFormat::PcmSigned,
154            bytes_per_sample: 2,
155            valid_bits_per_sample: 16,
156            frame_rate,
157        };
158        self.start_device(&id, true, pcm_format.clone()).map_err(Error::audio_core)?;
159        if let Err(e) = self.start_device(&id, false, pcm_format) {
160            // Stop the input device, so we have only two states: started and not started.
161            self.input.stop();
162            return Err(Error::audio_core(e));
163        }
164        self.active_connection = Some(connection);
165        let _ = self.sender.get_mut().try_send(ControlEvent::Started { id });
166        Ok(())
167    }
168
169    fn stop(&mut self, id: PeerId) -> Result<(), Error> {
170        if !self.active_connection.as_ref().is_some_and(|c| c.peer_id == id) {
171            return Err(Error::NotStarted);
172        };
173        self.active_connection = None;
174        self.output.stop();
175        self.input.stop();
176        let _ = self.sender.get_mut().try_send(ControlEvent::Stopped { id, error: None });
177        Ok(())
178    }
179
180    fn connect(&mut self, _id: PeerId, _supported_codecs: &[CodecId]) {
181        // nothing to do here
182    }
183
184    fn disconnect(&mut self, id: PeerId) {
185        let _ = self.stop(id);
186    }
187
188    fn take_events(&self) -> BoxStream<'static, ControlEvent> {
189        let (sender, receiver) = futures::channel::mpsc::channel(1);
190        let mut lock = self.sender.lock();
191        *lock = sender;
192        receiver.boxed()
193    }
194
195    fn failed_request(&self, _request: ControlEvent, _error: Error) {
196        // We do not send requests, so do nothing here.
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203
204    use fidl::endpoints::Proxy;
205    use fuchsia_async as fasync;
206    use futures::channel::mpsc;
207    use futures::SinkExt;
208
209    use crate::sco::test_utils::connection_for_codec;
210
211    #[fuchsia::test]
212    async fn fails_if_all_devices_not_found() {
213        let (proxy, _requests) =
214            fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
215
216        let setup_result = DaiControl::setup(vec![], proxy.clone()).await;
217        assert!(matches!(setup_result, Err(Error::DiscoveryFailed)));
218
219        let (input, _handle) = dai::test::test_digital_audio_interface(true);
220        let setup_result = DaiControl::setup(vec![input], proxy.clone()).await;
221        assert!(matches!(setup_result, Err(Error::DiscoveryFailed)));
222
223        let (output, _handle) = dai::test::test_digital_audio_interface(false);
224        let setup_result = DaiControl::setup(vec![output], proxy.clone()).await;
225        assert!(matches!(setup_result, Err(Error::DiscoveryFailed)));
226
227        let (input, _handle) = dai::test::test_digital_audio_interface(true);
228        let (output, _handle) = dai::test::test_digital_audio_interface(false);
229        let _audio = DaiControl::setup(vec![input, output], proxy.clone()).await.unwrap();
230    }
231
232    const TEST_PEER: PeerId = PeerId(0);
233    const OTHER_PEER: PeerId = PeerId(1);
234
235    #[fuchsia::test]
236    async fn starts_dai() {
237        let (proxy, audio_requests) =
238            fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
239
240        let (send, mut new_client_recv) = mpsc::channel(1);
241        let _audio_req_task = fasync::Task::spawn(handle_audio_requests(audio_requests, send));
242
243        let (input, _input_handle) = dai::test::test_digital_audio_interface(true);
244        let (output, _output_handle) = dai::test::test_digital_audio_interface(false);
245        let mut audio = DaiControl::setup(vec![input, output], proxy.clone()).await.unwrap();
246        let mut events = audio.take_events();
247
248        let (connection, _stream) = connection_for_codec(TEST_PEER, CodecId::CVSD, false);
249        let result = audio.start(TEST_PEER, connection, CodecId::CVSD);
250        result.expect("audio should start okay");
251
252        // Expect new audio devices that are output and input.
253        let audio_client_one = new_client_recv.next().await.expect("new audio device");
254        let audio_client_two = new_client_recv.next().await.expect("new audio device");
255        assert!(audio_client_one.is_input != audio_client_two.is_input, "input and output");
256
257        // A started event should have been delivered to indicate it's started.
258        match events.next().await {
259            Some(ControlEvent::Started { id }) => assert_eq!(TEST_PEER, id),
260            x => panic!("Expected a Started event and got {x:?}"),
261        };
262
263        let (connection, mut stream2) = connection_for_codec(TEST_PEER, CodecId::CVSD, false);
264        let result = audio.start(TEST_PEER, connection, CodecId::CVSD);
265        let _ = result.expect_err("Starting an already started source is an error");
266        // Should have dropped the connection since it's already started.
267        let request = stream2.next().await;
268        assert!(request.is_none());
269
270        // Can't start a connection for another peer either, DaiControl only supports one
271        // active peer at a time.
272        let (connection, mut stream2) = connection_for_codec(OTHER_PEER, CodecId::CVSD, false);
273        let result = audio.start(OTHER_PEER, connection, CodecId::CVSD);
274        let _ = result.expect_err("Starting an already started source is an error");
275        // Should have dropped the connection since it's already started.
276        let request = stream2.next().await;
277        assert!(request.is_none());
278    }
279
280    #[fuchsia::test]
281    async fn stop_dai() {
282        let (proxy, audio_requests) =
283            fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
284
285        let (send, mut new_client_recv) = mpsc::channel(1);
286        let _audio_req_task = fasync::Task::spawn(handle_audio_requests(audio_requests, send));
287
288        let (input, input_handle) = dai::test::test_digital_audio_interface(true);
289        let (output, output_handle) = dai::test::test_digital_audio_interface(false);
290        let mut audio = DaiControl::setup(vec![input, output], proxy.clone()).await.unwrap();
291        let mut events = audio.take_events();
292
293        let _ = audio.stop(OTHER_PEER).expect_err("stopping without starting is an error");
294
295        let (connection, _stream) = connection_for_codec(TEST_PEER, CodecId::CVSD, false);
296        let result = audio.start(TEST_PEER, connection, CodecId::CVSD);
297        result.expect("audio should start okay");
298
299        // Expect a new audio devices that we can start.
300        let audio_client_one = new_client_recv.next().await.expect("new audio device");
301        let rb_one = audio_client_one.start_up().await.expect("should be able to start");
302        let _start_time = rb_one.start().await.expect("DAI ringbuffer should start okay");
303        let audio_client_two = new_client_recv.next().await.expect("new audio device");
304        let rb_two = audio_client_two.start_up().await.expect("should be able to start");
305        let _start_time = rb_two.start().await.expect("DAI ringbuffer should start okay");
306
307        assert!(output_handle.is_started(), "Output DAI should be started");
308        assert!(input_handle.is_started(), "Input DAI should be started");
309
310        // A started event should have been delivered to indicate it's started.
311        match events.next().await {
312            Some(ControlEvent::Started { id }) => assert_eq!(TEST_PEER, id),
313            x => panic!("Expected a Started event and got {x:?}"),
314        };
315
316        // Stopping a peer that's not started is an error.
317        let _ =
318            audio.stop(OTHER_PEER).expect_err("shouldn't be able to stop a peer we didn't start");
319
320        // Stopping should close the audio client.
321        audio.stop(TEST_PEER).expect("audio should stop okay");
322        let _audio_closed = audio_client_one.client.on_closed().await;
323        let _audio_closed = audio_client_two.client.on_closed().await;
324
325        // and send a stopped for successful stops.
326        match events.next().await {
327            Some(ControlEvent::Stopped { id, error: None }) => {
328                assert_eq!(TEST_PEER, id);
329            }
330            x => panic!("Expected a non-error Stopped event and got {x:?}"),
331        };
332
333        let _ = audio.stop(TEST_PEER).expect_err("audio should not be able to stop after stopping");
334    }
335
336    struct TestAudioClient {
337        _name: String,
338        is_input: bool,
339        client: audio::StreamConfigProxy,
340    }
341
342    impl TestAudioClient {
343        async fn start_up(&self) -> Result<audio::RingBufferProxy, fidl::Error> {
344            let _prop = self.client.get_properties().await?;
345            let formats = self.client.get_supported_formats().await?;
346            // Pick the first one, why not.
347            let pcm_formats = formats.first().unwrap().pcm_supported_formats.as_ref().unwrap();
348            let pcm_format = Some(PcmFormat {
349                number_of_channels: pcm_formats.channel_sets.as_ref().unwrap()[0]
350                    .attributes
351                    .as_ref()
352                    .unwrap()
353                    .len() as u8,
354                sample_format: pcm_formats.sample_formats.as_ref().unwrap()[0],
355                bytes_per_sample: pcm_formats.bytes_per_sample.as_ref().unwrap()[0],
356                valid_bits_per_sample: pcm_formats.valid_bits_per_sample.as_ref().unwrap()[0],
357                frame_rate: pcm_formats.frame_rates.as_ref().unwrap()[0],
358            });
359            let (proxy, server_end) = fidl::endpoints::create_proxy();
360            self.client.create_ring_buffer(
361                &audio::Format { pcm_format, ..Default::default() },
362                server_end,
363            )?;
364            Ok(proxy)
365        }
366    }
367
368    async fn handle_audio_requests(
369        mut requests: media::AudioDeviceEnumeratorRequestStream,
370        mut stream_proxies: mpsc::Sender<TestAudioClient>,
371    ) {
372        while let Some(req) = requests.next().await {
373            match req.expect("AudioDeviceEnumerator stream error: {:?}") {
374                media::AudioDeviceEnumeratorRequest::AddDeviceByChannel {
375                    device_name,
376                    is_input,
377                    channel,
378                    ..
379                } => {
380                    let dev = TestAudioClient {
381                        _name: device_name.to_owned(),
382                        is_input,
383                        client: channel.into_proxy(),
384                    };
385                    if let Err(e) = stream_proxies.feed(dev).await {
386                        panic!("Couldn't send new device: {:?}", e);
387                    }
388                }
389                x => unimplemented!("Got unimplemented AudioDeviceEnumerator: {:?}", x),
390            }
391        }
392    }
393}