1use anyhow::format_err;
6use fuchsia_audio_codec::{StreamProcessor, StreamProcessorOutputStream};
7use fuchsia_audio_device::stream_config::SoftStreamConfig;
8use fuchsia_audio_device::{AudioFrameSink, AudioFrameStream};
9use fuchsia_bluetooth::types::{PeerId, peer_audio_stream_id};
10use fuchsia_sync::Mutex;
11use futures::stream::BoxStream;
12use futures::task::Context;
13use futures::{AsyncWriteExt, FutureExt, StreamExt};
14use log::{error, info, warn};
15use media::AudioDeviceEnumeratorProxy;
16use std::pin::pin;
17use {fidl_fuchsia_bluetooth_bredr as bredr, fidl_fuchsia_media as media, fuchsia_async as fasync};
18
19use crate::audio::{Control, ControlEvent, Error, HF_INPUT_UUID, HF_OUTPUT_UUID};
20use crate::codec_id::CodecId;
21use crate::sco;
22
23pub struct InbandControl {
26 audio_core: media::AudioDeviceEnumeratorProxy,
27 session_task: Option<(PeerId, fasync::Task<()>)>,
28 event_sender: Mutex<futures::channel::mpsc::Sender<ControlEvent>>,
29 stream: Mutex<Option<futures::channel::mpsc::Receiver<ControlEvent>>>,
30}
31
32struct AudioSession {
37 audio_frame_sink: AudioFrameSink,
38 audio_frame_stream: AudioFrameStream,
39 sco: sco::Connection,
40 codec: CodecId,
41 decoder: StreamProcessor,
42 encoder: StreamProcessor,
43 event_sender: futures::channel::mpsc::Sender<ControlEvent>,
44}
45
46impl AudioSession {
47 fn setup(
48 connection: sco::Connection,
49 codec: CodecId,
50 audio_frame_sink: AudioFrameSink,
51 audio_frame_stream: AudioFrameStream,
52 event_sender: futures::channel::mpsc::Sender<ControlEvent>,
53 ) -> Result<Self, Error> {
54 if !codec.is_supported() {
55 return Err(Error::UnsupportedParameters {
56 source: format_err!("unsupported codec {codec}"),
57 });
58 }
59 let decoder = StreamProcessor::create_decoder(codec.mime_type()?, Some(codec.oob_bytes()))
60 .map_err(|e| Error::audio_core(format_err!("creating decoder: {e:?}")))?;
61 let encoder = StreamProcessor::create_encoder(codec.try_into()?, codec.try_into()?)
62 .map_err(|e| Error::audio_core(format_err!("creating encoder: {e:?}")))?;
63 Ok(Self {
64 sco: connection,
65 decoder,
66 encoder,
67 audio_frame_sink,
68 audio_frame_stream,
69 codec,
70 event_sender,
71 })
72 }
73
74 async fn encoder_to_sco(
75 mut encoded_stream: StreamProcessorOutputStream,
76 proxy: bredr::ScoConnectionProxy,
77 codec: CodecId,
78 ) -> Error {
79 let packet: Vec<u8> = vec![0; 60]; let mut request =
82 bredr::ScoConnectionWriteRequest { data: Some(packet), ..Default::default() };
83
84 const MSBC_ENCODED_LEN: usize = 57; if codec == CodecId::MSBC {
86 let packet: &mut [u8] = request.data.as_mut().unwrap().as_mut_slice();
87 packet[0] = 0x01; }
90 let mut h2_marker = [0x08u8, 0x38, 0xc8, 0xf8].iter().cycle();
92 loop {
93 match encoded_stream.next().await {
94 Some(Ok(encoded)) => {
95 if codec == CodecId::MSBC {
96 if encoded.len() % MSBC_ENCODED_LEN != 0 {
97 warn!("Got {} bytes, uneven number of packets", encoded.len());
98 }
99 for sbc_packet in encoded.as_slice().chunks_exact(MSBC_ENCODED_LEN) {
100 let packet: &mut [u8] = request.data.as_mut().unwrap().as_mut_slice();
101 packet[1] = *h2_marker.next().unwrap();
102 packet[2..59].copy_from_slice(sbc_packet);
103 if let Err(e) = proxy.write(&request).await {
104 return e.into();
105 }
106 }
107 } else {
108 for cvsd_packet in encoded.as_slice().chunks_exact(60) {
111 let packet: &mut [u8] = request.data.as_mut().unwrap().as_mut_slice();
112 packet.copy_from_slice(cvsd_packet);
113 if let Err(e) = proxy.write(&request).await {
114 return e.into();
115 }
116 }
117 }
118 }
119 Some(Err(e)) => {
120 warn!("Error in encoding: {e:?}");
121 return Error::audio_core(format_err!("Couldn't read encoded: {e:?}"));
122 }
123 None => {
124 warn!("Error in encoding: Stream is ended!");
125 return Error::audio_core(format_err!("Encoder stream ended early"));
126 }
127 }
128 }
129 }
130
131 async fn pcm_to_encoder(mut encoder: StreamProcessor, mut stream: AudioFrameStream) -> Error {
132 loop {
133 match stream.next().await {
134 Some(Ok(pcm)) => {
135 if let Err(e) = encoder.write_all(pcm.as_slice()).await {
136 return Error::audio_core(format_err!("write to encoder: {e:?}"));
137 }
138 if let Err(e) = encoder.flush().await {
140 return Error::audio_core(format_err!("flush encoder: {e:?}"));
141 }
142 }
143 Some(Err(e)) => {
144 warn!("Audio output error: {e:?}");
145 return Error::audio_core(format_err!("output error: {e:?}"));
146 }
147 None => {
148 warn!("Ran out of audio input!");
149 return Error::audio_core(format_err!("Audio input end"));
150 }
151 }
152 }
153 }
154
155 async fn decoder_to_pcm(
156 mut decoded_stream: StreamProcessorOutputStream,
157 mut sink: AudioFrameSink,
158 ) -> Error {
159 let mut decoded_packets = 0;
160 loop {
161 match decoded_stream.next().await {
162 Some(Ok(decoded)) => {
163 decoded_packets += 1;
164 if decoded_packets % 500 == 0 {
165 info!(
166 "Got {} decoded bytes from decoder: {decoded_packets} packets",
167 decoded.len()
168 );
169 }
170 if let Err(e) = sink.write_all(decoded.as_slice()).await {
171 warn!("Error sending to sink: {e:?}");
172 return Error::audio_core(format_err!("send to sink: {e:?}"));
173 }
174 }
175 Some(Err(e)) => {
176 warn!("Error in decoding: {e:?}");
177 return Error::audio_core(format_err!("Couldn't read decoder: {e:?}"));
178 }
179 None => {
180 warn!("Error in decoding: Stream is ended!");
181 return Error::audio_core(format_err!("Decoder stream ended early"));
182 }
183 }
184 }
185 }
186
187 async fn sco_to_decoder(
188 proxy: bredr::ScoConnectionProxy,
189 mut decoder: StreamProcessor,
190 codec: CodecId,
191 ) -> Error {
192 loop {
193 let data = match proxy.read().await {
194 Ok(bredr::ScoConnectionReadResponse { data: Some(data), .. }) => data,
195 Ok(_) => return Error::audio_core(format_err!("Invalid Read response")),
196 Err(e) => return e.into(),
197 };
198 let packet = match codec {
199 CodecId::CVSD => data.as_slice(),
200 CodecId::MSBC => {
201 let (_header, packet) = data.as_slice().split_at(2);
203 if packet[0] != 0xad {
204 info!(
205 "Packet didn't start with syncword: {:#02x} {}",
206 packet[0],
207 packet.len()
208 );
209 }
210 packet
211 }
212 _ => {
213 return Error::UnsupportedParameters {
214 source: format_err!("Unknown CodecId: {codec:?}"),
215 };
216 }
217 };
218 if let Err(e) = decoder.write_all(packet).await {
219 return Error::audio_core(format_err!("Failed to write to decoder: {e:?}"));
220 }
221 if let Err(e) = decoder.flush().await {
224 return Error::audio_core(format_err!("Failed to flush decoder: {e:?}"));
225 }
226 }
227 }
228
229 async fn run(mut self) {
230 let peer_id = self.sco.peer_id;
231 let Ok(encoded_stream) = self.encoder.take_output_stream() else {
232 error!("Couldn't take encoder output stream");
233 return;
234 };
235 let sco_write =
236 AudioSession::encoder_to_sco(encoded_stream, self.sco.proxy.clone(), self.codec);
237 let sco_write = pin!(sco_write);
238 let audio_to_encoder = AudioSession::pcm_to_encoder(self.encoder, self.audio_frame_stream);
239 let audio_to_encoder = pin!(audio_to_encoder);
240
241 let Ok(decoded_stream) = self.decoder.take_output_stream() else {
242 error!("Couldn't take decoder output stream");
243 return;
244 };
245 let decoder_to_sink =
246 pin!(AudioSession::decoder_to_pcm(decoded_stream, self.audio_frame_sink));
247 let sco_read =
248 AudioSession::sco_to_decoder(self.sco.proxy.clone(), self.decoder, self.codec);
249 let sco_read = pin!(sco_read);
250 let e = futures::select! {
251 e = audio_to_encoder.fuse() => { warn!(e:?; "PCM to encoder write"); e},
252 e = sco_write.fuse() => { warn!(e:?; "Write encoded to SCO"); e},
253 e = sco_read.fuse() => { warn!(e:?; "SCO read to decoder"); e},
254 e = decoder_to_sink.fuse() => { warn!(e:?; "SCO decoder to PCM"); e},
255 };
256 let _ = self.event_sender.try_send(ControlEvent::Stopped { id: peer_id, error: Some(e) });
257 }
258
259 fn start(self) -> fasync::Task<()> {
260 fasync::Task::spawn(self.run())
261 }
262}
263
264impl InbandControl {
265 pub fn create(proxy: AudioDeviceEnumeratorProxy) -> Result<Self, Error> {
266 let (sender, receiver) = futures::channel::mpsc::channel(1);
267 Ok(Self {
268 audio_core: proxy,
269 session_task: None,
270 event_sender: Mutex::new(sender),
271 stream: Mutex::new(Some(receiver)),
272 })
273 }
274
275 fn running_id(&mut self) -> Option<PeerId> {
276 self.session_task
277 .as_mut()
278 .and_then(|(running, task)| {
279 let mut cx = Context::from_waker(&std::task::Waker::noop());
280 task.poll_unpin(&mut cx).is_pending().then_some(running)
283 })
284 .copied()
285 }
286
287 const LOCAL_MONOTONIC_CLOCK_DOMAIN: u32 = 0;
288
289 const AUDIO_BUFFER_DURATION: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(15);
292
293 fn start_input(&mut self, peer_id: PeerId, codec_id: CodecId) -> Result<AudioFrameSink, Error> {
294 let audio_dev_id = peer_audio_stream_id(peer_id, HF_INPUT_UUID);
295 let (client, sink) = SoftStreamConfig::create_input(
296 &audio_dev_id,
297 "Fuchsia",
298 super::DEVICE_NAME,
299 Self::LOCAL_MONOTONIC_CLOCK_DOMAIN,
300 codec_id.try_into()?,
301 Self::AUDIO_BUFFER_DURATION,
302 )
303 .map_err(|e| Error::audio_core(format_err!("Couldn't create input: {e:?}")))?;
304
305 self.audio_core.add_device_by_channel(super::DEVICE_NAME, true, client)?;
306 Ok(sink)
307 }
308
309 fn start_output(
310 &mut self,
311 peer_id: PeerId,
312 codec_id: CodecId,
313 ) -> Result<AudioFrameStream, Error> {
314 let audio_dev_id = peer_audio_stream_id(peer_id, HF_OUTPUT_UUID);
315 let (client, stream) = SoftStreamConfig::create_output(
316 &audio_dev_id,
317 "Fuchsia",
318 super::DEVICE_NAME,
319 Self::LOCAL_MONOTONIC_CLOCK_DOMAIN,
320 codec_id.try_into()?,
321 Self::AUDIO_BUFFER_DURATION,
322 zx::MonotonicDuration::from_millis(0),
323 )
324 .map_err(|e| Error::audio_core(format_err!("Couldn't create output: {e:?}")))?;
325 self.audio_core.add_device_by_channel(super::DEVICE_NAME, false, client)?;
326 Ok(stream)
327 }
328}
329
330impl Control for InbandControl {
331 fn start(
332 &mut self,
333 id: PeerId,
334 connection: sco::Connection,
335 codec: CodecId,
336 ) -> Result<(), Error> {
337 if let Some(running) = self.running_id() {
338 if running == id {
339 return Err(Error::AlreadyStarted);
340 }
341 return Err(Error::UnsupportedParameters {
342 source: format_err!("Only one peer can be started inband at once"),
343 });
344 }
345 let frame_sink = self.start_input(id, codec)?;
346 let frame_stream = self.start_output(id, codec)?;
347 let session = AudioSession::setup(
348 connection,
349 codec,
350 frame_sink,
351 frame_stream,
352 self.event_sender.lock().clone(),
353 )?;
354 self.session_task = Some((id, session.start()));
355 Ok(())
356 }
357
358 fn stop(&mut self, id: PeerId) -> Result<(), Error> {
359 if self.running_id() != Some(id) {
360 return Err(Error::NotStarted);
361 }
362 self.session_task = None;
363 let _ = self.event_sender.get_mut().try_send(ControlEvent::Stopped { id, error: None });
364 Ok(())
365 }
366
367 fn connect(&mut self, _id: PeerId, _supported_codecs: &[CodecId]) {
368 }
370
371 fn disconnect(&mut self, id: PeerId) {
372 let _ = self.stop(id);
373 }
374
375 fn take_events(&self) -> BoxStream<'static, ControlEvent> {
376 self.stream.lock().take().unwrap().boxed()
377 }
378
379 fn failed_request(&self, _request: ControlEvent, _error: Error) {
380 }
382}
383
384#[cfg(test)]
385mod tests {
386 use super::*;
387
388 use fidl_fuchsia_bluetooth_bredr::ScoConnectionRequestStream;
389
390 use crate::sco::test_utils::connection_for_codec;
391
392 const ZERO_INPUT_SBC_PACKET: [u8; 60] = [
395 0x80, 0x10, 0xad, 0x00, 0x00, 0xc5, 0x00, 0x00, 0x00, 0x00, 0x77, 0x6d, 0xb6, 0xdd, 0xdb,
396 0x6d, 0xb7, 0x76, 0xdb, 0x6d, 0xdd, 0xb6, 0xdb, 0x77, 0x6d, 0xb6, 0xdd, 0xdb, 0x6d, 0xb7,
397 0x76, 0xdb, 0x6d, 0xdd, 0xb6, 0xdb, 0x77, 0x6d, 0xb6, 0xdd, 0xdb, 0x6d, 0xb7, 0x76, 0xdb,
398 0x6d, 0xdd, 0xb6, 0xdb, 0x77, 0x6d, 0xb6, 0xdd, 0xdb, 0x6d, 0xb7, 0x76, 0xdb, 0x6c, 0x00,
399 ];
400
401 const ZERO_INPUT_CVSD_PACKET: [u8; 60] = [0x55; 60];
403
404 #[derive(PartialEq, Debug)]
405 enum ProcessedRequest {
406 ScoRead,
407 ScoWrite(Vec<u8>),
408 }
409
410 async fn process_sco_request(
412 sco_request_stream: &mut ScoConnectionRequestStream,
413 read_data: Vec<u8>,
414 ) -> Option<ProcessedRequest> {
415 match sco_request_stream.next().await {
416 Some(Ok(bredr::ScoConnectionRequest::Read { responder })) => {
417 let response = bredr::ScoConnectionReadResponse {
418 status_flag: Some(bredr::RxPacketStatus::CorrectlyReceivedData),
419 data: Some(read_data),
420 ..Default::default()
421 };
422 responder.send(&response).expect("sends okay");
423 Some(ProcessedRequest::ScoRead)
424 }
425 Some(Ok(bredr::ScoConnectionRequest::Write { payload, responder })) => {
426 responder.send().expect("response to write");
427 Some(ProcessedRequest::ScoWrite(payload.data.unwrap()))
428 }
429 None => None,
430 x => panic!("Expected read or write requests, got {x:?}"),
431 }
432 }
433
434 #[fuchsia::test]
435 async fn reads_audio_from_connection() {
436 let (proxy, _audio_enumerator_requests) =
437 fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
438 let mut control = InbandControl::create(proxy).unwrap();
439
440 let (connection, mut sco_request_stream) =
441 connection_for_codec(PeerId(1), CodecId::MSBC, true);
442
443 control.start(PeerId(1), connection, CodecId::MSBC).expect("should be able to start");
444
445 let (connection2, _request_stream) = connection_for_codec(PeerId(1), CodecId::MSBC, true);
446 let _ = control
447 .start(PeerId(1), connection2, CodecId::MSBC)
448 .expect_err("Starting twice shouldn't be allowed");
449
450 for _ in 1..10 {
453 assert_eq!(
454 Some(ProcessedRequest::ScoRead),
455 process_sco_request(&mut sco_request_stream, ZERO_INPUT_SBC_PACKET.to_vec()).await
456 );
457 }
458
459 control.stop(PeerId(1)).expect("should be able to stop");
460 let _ = control.stop(PeerId(1)).expect_err("can't stop a stopped thing");
461
462 let mut extra_requests = 0;
464 while let Some(r) =
465 process_sco_request(&mut sco_request_stream, ZERO_INPUT_SBC_PACKET.to_vec()).await
466 {
467 assert_eq!(ProcessedRequest::ScoRead, r);
468 extra_requests += 1;
469 }
470
471 info!("Got {extra_requests} extra ScoConnectionProxy Requests after stop");
472 }
473
474 #[fuchsia::test]
475 async fn audio_setup_error_bad_codec() {
476 let (proxy, _) =
477 fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
478 let mut control = InbandControl::create(proxy).unwrap();
479
480 let (connection, _sco_request_stream) =
481 connection_for_codec(PeerId(1), CodecId::MSBC, true);
482 let res = control.start(PeerId(1), connection, 0xD0u8.into());
483 assert!(res.is_err());
484 }
485
486 #[fuchsia::test]
487 async fn decode_sco_audio_path() {
488 use fidl_fuchsia_hardware_audio as audio;
489 let (proxy, mut audio_enumerator_requests) =
490 fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
491 let mut control = InbandControl::create(proxy).unwrap();
492
493 let (connection, mut sco_request_stream) =
494 connection_for_codec(PeerId(1), CodecId::MSBC, true);
495
496 control.start(PeerId(1), connection, CodecId::MSBC).expect("should be able to start");
497
498 let audio_input_stream_config;
499 let mut _audio_output_stream_config;
500 loop {
501 match audio_enumerator_requests.next().await {
502 Some(Ok(media::AudioDeviceEnumeratorRequest::AddDeviceByChannel {
503 is_input,
504 channel,
505 ..
506 })) => {
507 if is_input {
508 audio_input_stream_config = channel.into_proxy();
509 break;
510 } else {
511 _audio_output_stream_config = channel.into_proxy();
512 }
513 }
514 x => panic!("Expected audio device by channel, got {x:?}"),
515 }
516 }
517
518 let (ring_buffer, server) = fidl::endpoints::create_proxy::<audio::RingBufferMarker>();
519 audio_input_stream_config
520 .create_ring_buffer(&CodecId::MSBC.try_into().unwrap(), server)
521 .expect("create ring buffer");
522
523 assert_eq!(
525 Some(ProcessedRequest::ScoRead),
526 process_sco_request(&mut sco_request_stream, ZERO_INPUT_SBC_PACKET.to_vec()).await
527 );
528
529 let notifications_per_ring = 20;
530 let (frames, _vmo) = ring_buffer
533 .get_vmo(16000, notifications_per_ring)
534 .await
535 .expect("fidl")
536 .expect("response");
537
538 let mut position_info = ring_buffer.watch_clock_recovery_position_info();
540 let mut position_notifications = 0;
541
542 let _ = ring_buffer.start().await;
543
544 let frames_per_notification = frames / notifications_per_ring;
546 let expected_notifications = 12000 / frames_per_notification;
549
550 if position_info.poll_unpin(&mut Context::from_waker(&std::task::Waker::noop())).is_ready()
553 {
554 position_notifications += 1;
555 position_info = ring_buffer.watch_clock_recovery_position_info();
556 }
557 for _ in 1..100 {
558 assert_eq!(
559 Some(ProcessedRequest::ScoRead),
560 process_sco_request(&mut sco_request_stream, ZERO_INPUT_SBC_PACKET.to_vec()).await
561 );
562 if position_info
564 .poll_unpin(&mut Context::from_waker(&std::task::Waker::noop()))
565 .is_ready()
566 {
567 position_notifications += 1;
568 position_info = ring_buffer.watch_clock_recovery_position_info();
569 }
570 }
571
572 assert!(position_notifications >= expected_notifications);
576 assert!(position_notifications <= expected_notifications + 1);
577 }
578
579 #[fuchsia::test]
580 async fn encode_sco_audio_path_msbc() {
581 use fidl_fuchsia_hardware_audio as audio;
582 let (proxy, mut audio_enumerator_requests) =
583 fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
584 let mut control = InbandControl::create(proxy).unwrap();
585
586 let (connection, mut sco_request_stream) =
587 connection_for_codec(PeerId(1), CodecId::MSBC, true);
588
589 control.start(PeerId(1), connection, CodecId::MSBC).expect("should be able to start");
590
591 let audio_output_stream_config;
592 let mut _audio_input_stream_config;
593 loop {
594 match audio_enumerator_requests.next().await {
595 Some(Ok(media::AudioDeviceEnumeratorRequest::AddDeviceByChannel {
596 is_input,
597 channel,
598 ..
599 })) => {
600 if !is_input {
601 audio_output_stream_config = channel.into_proxy();
602 break;
603 } else {
604 _audio_input_stream_config = channel.into_proxy();
605 }
606 }
607 x => panic!("Expected audio device by channel, got {x:?}"),
608 }
609 }
610
611 let (ring_buffer, server) = fidl::endpoints::create_proxy::<audio::RingBufferMarker>();
612 audio_output_stream_config
613 .create_ring_buffer(&CodecId::MSBC.try_into().unwrap(), server)
614 .unwrap();
615
616 let notifications_per_ring = 20;
620 let (_frames, _vmo) = ring_buffer
622 .get_vmo(16000, notifications_per_ring)
623 .await
624 .expect("fidl")
625 .expect("response");
626
627 let _ = ring_buffer.start().await;
628
629 let next_header = &mut [0x01, 0x08];
631 for _sco_frame in 1..100 {
632 'sco: loop {
633 match process_sco_request(&mut sco_request_stream, ZERO_INPUT_SBC_PACKET.to_vec())
634 .await
635 {
636 Some(ProcessedRequest::ScoRead) => continue 'sco,
637 Some(ProcessedRequest::ScoWrite(data)) => {
638 assert_eq!(60, data.len());
639 assert_eq!(&ZERO_INPUT_SBC_PACKET[2..], &data[2..]);
641 assert_eq!(next_header, &data[0..2]);
642 match next_header[1] {
644 0x08 => next_header[1] = 0x38,
645 0x38 => next_header[1] = 0xc8,
646 0xc8 => next_header[1] = 0xf8,
647 0xf8 => next_header[1] = 0x08,
648 _ => unreachable!(),
649 };
650 break 'sco;
651 }
652 x => panic!("Expected read or write but got {x:?}"),
653 };
654 }
655 }
656 }
657
658 #[fuchsia::test]
659 async fn encode_sco_audio_path_cvsd() {
660 use fidl_fuchsia_hardware_audio as audio;
661 let (proxy, mut audio_enumerator_requests) =
662 fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
663 let mut control = InbandControl::create(proxy).unwrap();
664
665 let (connection, mut sco_request_stream) =
666 connection_for_codec(PeerId(1), CodecId::CVSD, true);
667
668 control.start(PeerId(1), connection, CodecId::CVSD).expect("should be able to start");
669
670 let audio_output_stream_config;
671 let mut _audio_input_stream_config;
672 loop {
673 match audio_enumerator_requests.next().await {
674 Some(Ok(media::AudioDeviceEnumeratorRequest::AddDeviceByChannel {
675 is_input,
676 channel,
677 ..
678 })) => {
679 if !is_input {
680 audio_output_stream_config = channel.into_proxy();
681 break;
682 } else {
683 _audio_input_stream_config = channel.into_proxy();
684 }
685 }
686 x => panic!("Expected audio device by channel, got {x:?}"),
687 }
688 }
689
690 let (ring_buffer, server) = fidl::endpoints::create_proxy::<audio::RingBufferMarker>();
691 audio_output_stream_config
692 .create_ring_buffer(&CodecId::CVSD.try_into().unwrap(), server)
693 .unwrap();
694
695 let notifications_per_ring = 10;
699 let (_frames, _vmo) = ring_buffer
701 .get_vmo(64000, notifications_per_ring)
702 .await
703 .expect("fidl")
704 .expect("response");
705
706 let _ = ring_buffer.start().await;
707
708 for _sco_frame in 1..100 {
710 'sco: loop {
711 match process_sco_request(&mut sco_request_stream, ZERO_INPUT_CVSD_PACKET.to_vec())
712 .await
713 {
714 Some(ProcessedRequest::ScoRead) => continue 'sco,
715 Some(ProcessedRequest::ScoWrite(data)) => {
716 assert_eq!(60, data.len());
718 assert_eq!(&ZERO_INPUT_CVSD_PACKET, data.as_slice());
719 break 'sco;
720 }
721 x => panic!("Expected read or write but got {x:?}"),
722 };
723 }
724 }
725 }
726
727 #[fuchsia::test]
728 async fn read_from_audio_output() {
729 use fidl_fuchsia_hardware_audio as audio;
730 let (proxy, mut audio_enumerator_requests) =
731 fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
732 let mut control = InbandControl::create(proxy).unwrap();
733
734 let (connection, mut sco_request_stream) =
735 connection_for_codec(PeerId(1), CodecId::MSBC, true);
736
737 control.start(PeerId(1), connection, CodecId::MSBC).expect("should be able to start");
738
739 let audio_output_stream_config;
740 let mut _audio_input_stream_config;
741 loop {
742 match audio_enumerator_requests.next().await {
743 Some(Ok(media::AudioDeviceEnumeratorRequest::AddDeviceByChannel {
744 is_input,
745 channel,
746 ..
747 })) => {
748 if !is_input {
749 audio_output_stream_config = channel.into_proxy();
750 break;
751 } else {
752 _audio_input_stream_config = channel.into_proxy();
753 }
754 }
755 x => panic!("Expected audio device by channel, got {x:?}"),
756 }
757 }
758
759 let (ring_buffer, server) = fidl::endpoints::create_proxy::<audio::RingBufferMarker>();
760 audio_output_stream_config
761 .create_ring_buffer(&CodecId::MSBC.try_into().unwrap(), server)
762 .expect("create ring buffer");
763
764 let notifications_per_ring = 20;
765 let (_frames, _vmo) = ring_buffer
767 .get_vmo(16000, notifications_per_ring)
768 .await
769 .expect("fidl")
770 .expect("response");
771
772 let _ = ring_buffer.start().await;
773
774 'position_notifications: for i in 1..20 {
777 let mut position_info = ring_buffer.watch_clock_recovery_position_info();
778 loop {
779 let sco_activity = Box::pin(process_sco_request(
780 &mut sco_request_stream,
781 ZERO_INPUT_SBC_PACKET.to_vec(),
782 ));
783 use futures::future::Either;
784 match futures::future::select(position_info, sco_activity).await {
785 Either::Left((result, _sco_fut)) => {
786 assert!(result.is_ok(), "Position Info failed at {i}");
787 continue 'position_notifications;
788 }
789 Either::Right((_sco_pkt, position_info_fut)) => {
790 position_info = position_info_fut;
791 }
792 }
793 }
794 }
795 }
796
797 #[fuchsia::test]
798 async fn audio_output_error_sends_to_events() {
799 let (proxy, mut audio_enumerator_requests) =
800 fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
801 let mut control = InbandControl::create(proxy).unwrap();
802 let mut events = control.take_events();
803
804 let (connection, _sco_request_stream) =
805 connection_for_codec(PeerId(1), CodecId::MSBC, true);
806
807 control.start(PeerId(1), connection, CodecId::MSBC).expect("should be able to start");
808
809 let audio_output_stream_config;
810 let mut _audio_input_stream_config;
811 loop {
812 match audio_enumerator_requests.next().await {
813 Some(Ok(media::AudioDeviceEnumeratorRequest::AddDeviceByChannel {
814 is_input,
815 channel,
816 ..
817 })) => {
818 if !is_input {
819 audio_output_stream_config = channel.into_proxy();
820 break;
821 } else {
822 _audio_input_stream_config = channel.into_proxy();
823 }
824 }
825 x => panic!("Expected audio device by channel, got {x:?}"),
826 }
827 }
828
829 drop(audio_output_stream_config);
830
831 match events.next().await {
833 Some(ControlEvent::Stopped { id, error: Some(_) }) => {
834 assert_eq!(PeerId(1), id);
835 }
836 x => panic!("Expected the peer to have error stop, but got {x:?}"),
837 };
838 }
839}