bt_hfp/audio/
codec.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::anyhow;
6use fuchsia_audio_device::codec;
7use fuchsia_audio_device::codec::CodecRequest;
8use fuchsia_bluetooth::types::{peer_audio_stream_id, PeerId};
9use fuchsia_sync::Mutex;
10use futures::stream::BoxStream;
11use futures::{SinkExt, StreamExt};
12use log::{info, warn};
13use std::sync::Arc;
14use {
15    fidl_fuchsia_audio_device as audio_device, fidl_fuchsia_hardware_audio as audio,
16    fuchsia_async as fasync,
17};
18
19use super::{Control, ControlEvent, Error, HF_INPUT_UUID};
20use crate::codec_id::CodecId;
21use crate::sco;
22
23#[derive(Default)]
24struct CodecControlInner {
25    start_request:
26        Option<Box<dyn FnOnce(std::result::Result<zx::MonotonicInstant, zx::Status>) + Send>>,
27    stop_request:
28        Option<Box<dyn FnOnce(std::result::Result<zx::MonotonicInstant, zx::Status>) + Send>>,
29}
30
31// Control that is connected to a Codec device registered with an
32/// AudioDeviceRegistry component.  The AudioDeviceRegistry can request that we
33/// start and/or stop the audio in-band which will send the request on to the
34/// HFP task to initiate Audio Connection Setup when no call is in progress.
35pub struct CodecControl {
36    provider: audio_device::ProviderProxy,
37    codec_task: Option<fasync::Task<()>>,
38    events_sender: futures::channel::mpsc::Sender<ControlEvent>,
39    events_receiver: Mutex<Option<futures::channel::mpsc::Receiver<ControlEvent>>>,
40    codec_id: Option<CodecId>,
41    connection: Option<sco::Connection>,
42    connected_peer: Option<PeerId>,
43    inner: Arc<Mutex<CodecControlInner>>,
44}
45
46impl Control for CodecControl {
47    fn start(
48        &mut self,
49        id: PeerId,
50        connection: sco::Connection,
51        codec: crate::codec_id::CodecId,
52    ) -> Result<(), Error> {
53        if self.connection.is_some() {
54            return Err(Error::AlreadyStarted);
55        }
56        if Some(codec) != self.codec_id {
57            return Err(Error::UnsupportedParameters {
58                source: anyhow!("CodecId must match connected CodecId"),
59            });
60        }
61        if Some(id) != self.connected_peer {
62            return Err(Error::UnsupportedParameters {
63                source: anyhow!("Can't start a non-connected peer"),
64            });
65        };
66        let Some(start_request) = self.inner.lock().start_request.take() else {
67            return Err(Error::UnsupportedParameters {
68                source: anyhow!("Can only start in response to request"),
69            });
70        };
71        self.connection = Some(connection);
72        start_request(Ok(fuchsia_async::MonotonicInstant::now().into()));
73        Ok(())
74    }
75
76    fn stop(&mut self, id: PeerId) -> Result<(), Error> {
77        if self.connection.is_none() {
78            return Err(Error::NotStarted);
79        }
80        if Some(id) != self.connected_peer {
81            return Err(Error::UnsupportedParameters {
82                source: anyhow!("Can't stop a non-connected peer"),
83            });
84        }
85        let Some(stop_request) = self.inner.lock().stop_request.take() else {
86            return Err(Error::UnsupportedParameters {
87                source: anyhow!("Can only stop in response to request"),
88            });
89        };
90        self.connection = None;
91        stop_request(Ok(fuchsia_async::MonotonicInstant::now().into()));
92        Ok(())
93    }
94
95    fn connect(&mut self, id: PeerId, supported_codecs: &[CodecId]) {
96        let supported_formats: audio::DaiSupportedFormats;
97        if supported_codecs.contains(&CodecId::MSBC) {
98            self.codec_id = Some(CodecId::MSBC);
99            supported_formats = CodecId::MSBC.try_into().unwrap();
100        } else {
101            self.codec_id = Some(CodecId::CVSD);
102            supported_formats = CodecId::CVSD.try_into().unwrap();
103        };
104        let audio_dev_id = peer_audio_stream_id(id, HF_INPUT_UUID);
105        let (codec, client) = codec::SoftCodec::create(
106            Some(&audio_dev_id),
107            "Fuchsia",
108            super::DEVICE_NAME,
109            codec::CodecDirection::Duplex,
110            supported_formats.clone(),
111            true,
112        );
113        self.codec_task = Some(fasync::Task::local(codec_task(
114            id,
115            self.provider.clone(),
116            codec,
117            supported_formats,
118            client,
119            self.events_sender.clone(),
120            self.inner.clone(),
121        )));
122        self.connected_peer = Some(id);
123    }
124
125    fn disconnect(&mut self, _id: PeerId) {
126        self.codec_task = None;
127        self.connected_peer = None;
128    }
129
130    fn take_events(&self) -> BoxStream<'static, ControlEvent> {
131        self.events_receiver.lock().take().unwrap().boxed()
132    }
133
134    fn failed_request(&self, request: ControlEvent, _error: Error) {
135        match request {
136            ControlEvent::RequestStart { id: _ } => {
137                let Some(start_request) = self.inner.lock().start_request.take() else {
138                    return;
139                };
140                start_request(Err(zx::Status::INTERNAL));
141            }
142            ControlEvent::RequestStop { id: _ } => {
143                let Some(stop_request) = self.inner.lock().start_request.take() else {
144                    return;
145                };
146                stop_request(Err(zx::Status::INTERNAL));
147            }
148            _ => unreachable!(),
149        }
150    }
151}
152
153async fn codec_task(
154    id: PeerId,
155    provider: audio_device::ProviderProxy,
156    mut codec: codec::SoftCodec,
157    supported_formats: audio::DaiSupportedFormats,
158    client: fidl::endpoints::ClientEnd<audio::CodecMarker>,
159    mut event_sender: futures::channel::mpsc::Sender<ControlEvent>,
160    inner: Arc<Mutex<CodecControlInner>>,
161) {
162    let result = provider
163        .add_device(audio_device::ProviderAddDeviceRequest {
164            device_name: Some(super::DEVICE_NAME.into()),
165            device_type: Some(audio_device::DeviceType::Codec),
166            driver_client: Some(audio_device::DriverClient::Codec(client)),
167            ..Default::default()
168        })
169        .await;
170    match result {
171        Err(e) => {
172            warn!("FIDL Error adding device: {e:?}");
173            return;
174        }
175        Ok(Err(e)) => {
176            warn!("Failed to add device: {e:?}");
177            return;
178        }
179        Ok(Ok(_)) => {}
180    };
181    info!("Added Codec device!");
182    while let Some(event) = codec.next().await {
183        let Ok(event) = event else {
184            let _ = event_sender
185                .send(ControlEvent::Stopped {
186                    id,
187                    error: Some(Error::audio_core(event.err().unwrap().into())),
188                })
189                .await;
190            return;
191        };
192        info!("Codec request: {event:?}");
193        let audio_event = match event {
194            CodecRequest::SetFormat { format, responder } => {
195                if supported_formats.number_of_channels.contains(&format.number_of_channels)
196                    && supported_formats.frame_formats.contains(&format.frame_format)
197                    && supported_formats.sample_formats.contains(&format.sample_format)
198                    && supported_formats.frame_rates.contains(&format.frame_rate)
199                    && supported_formats.bits_per_slot.contains(&format.bits_per_slot)
200                    && supported_formats.bits_per_sample.contains(&format.bits_per_sample)
201                {
202                    responder(Ok(()));
203                } else {
204                    responder(Err(zx::Status::NOT_SUPPORTED));
205                }
206                continue;
207            }
208            CodecRequest::Start { responder } => {
209                if inner.lock().start_request.is_some() {
210                    responder(Err(zx::Status::ALREADY_EXISTS));
211                    continue;
212                }
213                inner.lock().start_request = Some(responder);
214                ControlEvent::RequestStart { id }
215            }
216            CodecRequest::Stop { responder } => {
217                if inner.lock().stop_request.is_some() {
218                    responder(Err(zx::Status::ALREADY_EXISTS));
219                    continue;
220                }
221                inner.lock().stop_request = Some(responder);
222                ControlEvent::RequestStop { id }
223            }
224        };
225        let _ = event_sender.send(audio_event).await;
226    }
227    warn!("Codec device finished, dropping..!");
228}
229
230impl CodecControl {
231    pub fn new(provider: audio_device::ProviderProxy) -> Self {
232        let (events_sender, receiver) = futures::channel::mpsc::channel(1);
233        Self {
234            provider,
235            codec_task: None,
236            events_sender,
237            events_receiver: Mutex::new(Some(receiver)),
238            inner: Default::default(),
239            codec_id: None,
240            connection: None,
241            connected_peer: None,
242        }
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249
250    use fidl::endpoints::Proxy;
251    use fixture::fixture;
252    use futures::task::{Context, Poll};
253    use futures::FutureExt;
254
255    use crate::sco::test_utils::connection_for_codec;
256
257    async fn codec_setup_connected<F, Fut>(_test_name: &str, test: F)
258    where
259        F: FnOnce(audio::CodecProxy, CodecControl) -> Fut,
260        Fut: futures::Future<Output = ()>,
261    {
262        let (provider_proxy, mut provider_requests) =
263            fidl::endpoints::create_proxy_and_stream::<audio_device::ProviderMarker>();
264        let mut codec = CodecControl::new(provider_proxy);
265
266        codec.connect(PeerId(1), &[CodecId::MSBC]);
267
268        let Some(Ok(audio_device::ProviderRequest::AddDevice {
269            payload:
270                audio_device::ProviderAddDeviceRequest {
271                    driver_client: Some(client),
272                    device_name: Some(_name),
273                    device_type: Some(device_type),
274                    ..
275                },
276            responder,
277        })) = provider_requests.next().await
278        else {
279            panic!("Expected a request from the connect");
280        };
281
282        assert_eq!(device_type, audio_device::DeviceType::Codec);
283
284        responder.send(Ok(&Default::default())).expect("response to succeed");
285
286        let audio_device::DriverClient::Codec(codec_client) = client else {
287            panic!("Should have provided a codec client");
288        };
289
290        let codec_proxy = codec_client.into_proxy();
291
292        test(codec_proxy, codec).await
293    }
294
295    #[fixture(codec_setup_connected)]
296    #[fuchsia::test]
297    async fn publishes_on_connect(codec_client: audio::CodecProxy, codec: CodecControl) {
298        let _properties = codec_client.get_properties().await.unwrap();
299        let audio::CodecGetDaiFormatsResult::Ok(formats) =
300            codec_client.get_dai_formats().await.unwrap()
301        else {
302            panic!("Expected formats from get_dai_formats");
303        };
304
305        assert_eq!(formats.len(), 1);
306        // MSBC has a frame-rate of 16khz
307        assert_eq!(formats[0].frame_rates[0], 16000);
308        drop(codec);
309    }
310
311    #[fixture(codec_setup_connected)]
312    #[fuchsia::test]
313    async fn removed_on_disconnect(codec_client: audio::CodecProxy, mut codec: CodecControl) {
314        codec.disconnect(PeerId(1));
315        let _ = codec_client.on_closed().await;
316    }
317
318    #[fixture(codec_setup_connected)]
319    #[fuchsia::test]
320    async fn start_request_lifetime(codec_client: audio::CodecProxy, mut codec: CodecControl) {
321        let mut event_stream = codec.take_events();
322        // start without a request should fail
323        let (connection, _stream) = connection_for_codec(PeerId(1), CodecId::MSBC, false);
324        let start_result = codec.start(PeerId(1), connection, CodecId::MSBC);
325        let Err(Error::UnsupportedParameters { .. }) = start_result else {
326            panic!("Expected error from start before request");
327        };
328
329        // request comes in
330        let mut start_fut = codec_client.start();
331        let (waker, wake_count) = futures_test::task::new_count_waker();
332        let Poll::Pending = start_fut.poll_unpin(&mut Context::from_waker(&waker)) else {
333            panic!("Expected start to be pending");
334        };
335
336        let Some(ControlEvent::RequestStart { id }) = event_stream.next().await else {
337            panic!("Expected start request from event stream");
338        };
339        assert_eq!(id, PeerId(1));
340
341        // starting with a non-MSBC codec fails, and doesn't complete the future.
342        let (connection, _stream) = connection_for_codec(PeerId(1), CodecId::CVSD, false);
343        let start_result = codec.start(PeerId(1), connection, CodecId::CVSD);
344        let Err(Error::UnsupportedParameters { .. }) = start_result else {
345            panic!("Expected error from start before request");
346        };
347        assert_eq!(wake_count.get(), 0);
348
349        // starting after works and then completes the request.
350        let (connection, _stream) = connection_for_codec(PeerId(1), CodecId::MSBC, false);
351        codec.start(PeerId(1), connection, CodecId::MSBC).expect("should start ok");
352
353        let Poll::Ready(_) = start_fut.poll_unpin(&mut Context::from_waker(&waker)) else {
354            panic!("Expected to get response back from start");
355        };
356
357        // Starting after started is no good either.
358        let (connection, _stream) = connection_for_codec(PeerId(1), CodecId::MSBC, false);
359        let start_result = codec.start(PeerId(1), connection, CodecId::MSBC);
360        let Err(Error::AlreadyStarted) = start_result else {
361            panic!("Expected error from start while started");
362        };
363    }
364
365    #[fixture(codec_setup_connected)]
366    #[fuchsia::test]
367    async fn stop_request_lifetime(codec_client: audio::CodecProxy, mut codec: CodecControl) {
368        let mut event_stream = codec.take_events();
369        // can't stop before we are started
370        let Err(Error::NotStarted) = codec.stop(PeerId(1)) else {
371            panic!("Expected to not be able tp start when stopped");
372        };
373
374        // request comes in
375        let start_fut = codec_client.start();
376
377        let Some(ControlEvent::RequestStart { .. }) = event_stream.next().await else {
378            panic!("Expected start request from event stream");
379        };
380
381        // starting after works and then completes the request.
382        let (connection, _stream) = connection_for_codec(PeerId(1), CodecId::MSBC, false);
383        codec.start(PeerId(1), connection, CodecId::MSBC).expect("should start ok");
384        let _ = start_fut.await.expect("start to succeed");
385
386        // can't stop without a request
387        let Err(Error::UnsupportedParameters { .. }) = codec.stop(PeerId(1)) else {
388            panic!("expected to not be able to stop without a request");
389        };
390
391        // request to stop comes in
392        let mut stop_fut = codec_client.stop();
393        let (waker, _wake_count) = futures_test::task::new_count_waker();
394        let Poll::Pending = stop_fut.poll_unpin(&mut Context::from_waker(&waker)) else {
395            panic!("Expected stop to be pending");
396        };
397
398        let Some(ControlEvent::RequestStop { id }) = event_stream.next().await else {
399            panic!("Expected stop request from event stream");
400        };
401        assert_eq!(id, PeerId(1));
402
403        // Can't stop a peer that's not started.
404        let _ = codec.stop(PeerId(2)).expect_err("shouldn't be able to stop a different peer");
405
406        // can stop the one requested
407        codec.stop(PeerId(1)).expect("should be able to stop");
408
409        let Poll::Ready(_) = stop_fut.poll_unpin(&mut Context::from_waker(&waker)) else {
410            panic!("Expected to get response back from start");
411        };
412        // back to being able to not stop it again
413
414        let Err(Error::NotStarted) = codec.stop(PeerId(1)) else {
415            panic!("Expected to not be able tp start when stopped");
416        };
417    }
418}