1use async_utils::hanging_get::server as hanging_server;
6use fidl::endpoints::{ClientEnd, ControlHandle, Responder};
7use fidl_fuchsia_hardware_audio::{
8 CodecMarker, CodecRequestStream, CodecStartResponder, CodecStopResponder,
9 CodecWatchPlugStateResponder, PlugState,
10};
11use fidl_fuchsia_hardware_audio_signalprocessing::SignalProcessingRequestStream;
12use fuchsia_async as fasync;
13use fuchsia_sync::Mutex;
14use futures::stream::FusedStream;
15use futures::task::{Context, Poll};
16use futures::{Stream, StreamExt};
17use std::fmt::Debug;
18use std::sync::Arc;
19
20use crate::types::{Error, Result};
21
22pub struct SoftCodec {
27 codec_requests: Option<CodecRequestStream>,
29 signal_processing_requests: Option<SignalProcessingRequestStream>,
31 properties: fidl_fuchsia_hardware_audio::CodecProperties,
33 supported_formats: [fidl_fuchsia_hardware_audio::DaiSupportedFormats; 1],
36 selected_format: Arc<Mutex<Option<fidl_fuchsia_hardware_audio::DaiFormat>>>,
38 start_state: Arc<Mutex<StartState>>,
41 plug_state_publisher: hanging_server::Publisher<
43 PlugState,
44 CodecWatchPlugStateResponder,
45 Box<dyn Fn(&PlugState, CodecWatchPlugStateResponder) -> bool>,
46 >,
47 plug_state_subscriber: hanging_server::Subscriber<
49 PlugState,
50 CodecWatchPlugStateResponder,
51 Box<dyn Fn(&PlugState, CodecWatchPlugStateResponder) -> bool>,
52 >,
53 terminated: TerminatedState,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Default)]
58enum TerminatedState {
59 #[default]
60 NotTerminated,
61 Terminating,
63 Terminated,
65}
66
67#[derive(Debug)]
68pub enum StartState {
69 Stopped(zx::MonotonicInstant),
70 Starting(Option<CodecStartResponder>),
71 Started(zx::MonotonicInstant),
72 Stopping(Option<CodecStopResponder>),
73}
74
75impl Default for StartState {
76 fn default() -> Self {
77 Self::Stopped(fasync::MonotonicInstant::now().into())
78 }
79}
80
81pub enum CodecDirection {
82 Input,
83 Output,
84 Duplex,
85}
86
87pub enum CodecRequest {
89 SetFormat {
91 format: fidl_fuchsia_hardware_audio::DaiFormat,
93 responder: Box<dyn FnOnce(std::result::Result<(), zx::Status>) + Send>,
96 },
97 Start {
99 responder: Box<dyn FnOnce(std::result::Result<zx::MonotonicInstant, zx::Status>) + Send>,
105 },
106 Stop {
108 responder: Box<dyn FnOnce(std::result::Result<zx::MonotonicInstant, zx::Status>) + Send>,
112 },
113}
114
115impl Debug for CodecRequest {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 match self {
118 Self::SetFormat { format, .. } => {
119 f.debug_struct("SetFormat").field("format", format).finish()
120 }
121 Self::Start { .. } => f.debug_struct("Start").finish(),
122 Self::Stop { .. } => f.debug_struct("Stop").finish(),
123 }
124 }
125}
126
127impl CodecRequest {
128 fn start(state: Arc<Mutex<StartState>>) -> Self {
129 Self::Start {
130 responder: Box::new(move |time| {
131 let mut lock = state.lock();
132 let StartState::Starting(ref mut responder) = *lock else {
133 log::warn!("Not in starting state, noop response");
134 return;
135 };
136 let Some(responder) = responder.take() else {
137 log::warn!("Responder missing for start, noop");
138 return;
139 };
140 let Ok(time) = time else {
141 drop(responder);
143 return;
144 };
145 *lock = StartState::Started(time);
146 let _ = responder.send(time.into_nanos());
147 }),
148 }
149 }
150
151 fn stop(state: Arc<Mutex<StartState>>) -> Self {
152 Self::Stop {
153 responder: Box::new(move |time| {
154 let mut lock = state.lock();
155 let StartState::Stopping(ref mut responder) = *lock else {
156 log::warn!("Not in stopping state, noop response");
157 return;
158 };
159 let Some(responder) = responder.take() else {
160 log::warn!("Responder missing for stop, noop");
161 return;
162 };
163 let Ok(time) = time else {
164 drop(responder);
166 return;
167 };
168 *lock = StartState::Stopped(time);
169 let _ = responder.send(time.into_nanos());
170 }),
171 }
172 }
173}
174
175impl SoftCodec {
176 pub fn create(
177 unique_id: Option<&[u8; 16]>,
178 manufacturer: &str,
179 product: &str,
180 direction: CodecDirection,
181 formats: fidl_fuchsia_hardware_audio::DaiSupportedFormats,
182 initially_plugged: bool,
183 ) -> (Self, ClientEnd<CodecMarker>) {
184 let (client, codec_requests) = fidl::endpoints::create_request_stream::<CodecMarker>();
185 let is_input = match direction {
186 CodecDirection::Input => Some(true),
187 CodecDirection::Output => Some(false),
188 CodecDirection::Duplex => None,
189 };
190 let properties = fidl_fuchsia_hardware_audio::CodecProperties {
191 is_input,
192 manufacturer: Some(manufacturer.to_string()),
193 product: Some(product.to_string()),
194 unique_id: unique_id.cloned(),
195 plug_detect_capabilities: Some(
196 fidl_fuchsia_hardware_audio::PlugDetectCapabilities::CanAsyncNotify,
197 ),
198 ..Default::default()
199 };
200 let plug_state = fidl_fuchsia_hardware_audio::PlugState {
201 plugged: Some(initially_plugged),
202 plug_state_time: Some(fuchsia_async::MonotonicInstant::now().into_nanos()),
203 ..Default::default()
204 };
205 let mut plug_state_server = hanging_server::HangingGet::<
206 _,
207 _,
208 Box<dyn Fn(&PlugState, CodecWatchPlugStateResponder) -> bool>,
209 >::new(
210 plug_state,
211 Box::new(|state, responder: CodecWatchPlugStateResponder| {
212 let _ = responder.send(state);
213 true
214 }),
215 );
216 let plug_state_publisher = plug_state_server.new_publisher();
217 let plug_state_subscriber = plug_state_server.new_subscriber();
218 (
219 Self {
220 codec_requests: Some(codec_requests),
221 signal_processing_requests: Default::default(),
222 properties,
223 supported_formats: [formats],
224 selected_format: Default::default(),
225 start_state: Default::default(),
226 plug_state_publisher,
227 plug_state_subscriber,
228 terminated: Default::default(),
229 },
230 client,
231 )
232 }
233
234 pub fn update_plug_state(&self, plugged: bool) -> Result<()> {
235 self.plug_state_publisher.set(fidl_fuchsia_hardware_audio::PlugState {
236 plugged: Some(plugged),
237 plug_state_time: Some(fasync::MonotonicInstant::now().into_nanos()),
238 ..Default::default()
239 });
240 Ok(())
241 }
242
243 fn poll_codec(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<CodecRequest>>> {
246 let Some(codec_requests) = self.codec_requests.as_mut() else {
247 panic!("Codec requests polled without a request stream");
248 };
249 loop {
250 let request = futures::ready!(codec_requests.poll_next_unpin(cx));
251 let request = match request {
252 None => {
253 self.terminated = TerminatedState::Terminating;
254 return Poll::Ready(Some(Err(Error::RequestStreamError(
255 fidl::Error::ClientRead(zx::Status::PEER_CLOSED.into()),
256 ))));
257 }
258 Some(Err(e)) => {
259 self.terminated = TerminatedState::Terminating;
260 return Poll::Ready(Some(Err(Error::RequestStreamError(e))));
261 }
262 Some(Ok(request)) => request,
263 };
264 use fidl_fuchsia_hardware_audio::CodecRequest::*;
265 log::info!("Handling codec request: {request:?}");
266 match request {
267 GetHealthState { responder } => {
268 let _ = responder.send(&fidl_fuchsia_hardware_audio::HealthState::default());
269 }
270 SignalProcessingConnect { protocol, .. } => {
271 if self.signal_processing_requests.is_some() {
272 let _ = protocol.close_with_epitaph(zx::Status::ALREADY_BOUND);
273 continue;
274 }
275 let _ = protocol.close_with_epitaph(zx::Status::NOT_SUPPORTED);
277 }
278 IsBridgeable { responder } => {
279 let _ = responder.send(false);
280 }
281 SetBridgedMode { .. } => {}
283 Reset { responder } => {
284 responder.control_handle().shutdown_with_epitaph(zx::Status::NOT_SUPPORTED);
286 continue;
287 }
288 GetProperties { responder } => {
289 let _ = responder.send(&self.properties);
290 }
291 Start { responder } => {
292 let mut lock = self.start_state.lock();
293 match *lock {
294 StartState::Started(time) => {
295 let _ = responder.send(time.into_nanos());
296 continue;
297 }
298 StartState::Stopped(_) => {
299 *lock = StartState::Starting(Some(responder));
300 return Poll::Ready(Some(Ok(CodecRequest::start(
301 self.start_state.clone(),
302 ))));
303 }
304 StartState::Starting(ref mut existing_responder) => {
305 log::warn!("Got start while starting, closing codec");
307 drop(responder);
308 drop(existing_responder.take());
309 }
310 _ => {
311 log::warn!("Got start while stopping, closing codec");
313 drop(responder);
314 }
315 }
316 }
317 Stop { responder } => {
318 let mut lock = self.start_state.lock();
319 match *lock {
320 StartState::Stopped(time) => {
321 let _ = responder.send(time.into_nanos());
322 continue;
323 }
324 StartState::Started(_) => {
325 *lock = StartState::Stopping(Some(responder));
326 return Poll::Ready(Some(Ok(CodecRequest::stop(
327 self.start_state.clone(),
328 ))));
329 }
330 StartState::Stopping(ref mut existing_responder) => {
331 log::warn!("Got stop while stopping, closing codec");
333 drop(responder);
334 drop(existing_responder.take());
335 }
336 _ => {
337 log::warn!("Got stop while starting, closing codec");
339 drop(responder);
340 }
341 }
342 }
343 GetDaiFormats { responder } => {
344 let _ = responder.send(Ok(self.supported_formats.as_slice()));
345 }
346 SetDaiFormat { format, responder } => {
347 let responder = Box::new({
348 let selected = self.selected_format.clone();
349 let format = format.clone();
350 move |result: std::result::Result<(), zx::Status>| {
351 let _ = match result {
352 Ok(()) => {
354 *selected.lock() = Some(format);
355 responder.send(Ok(
356 &fidl_fuchsia_hardware_audio::CodecFormatInfo::default(),
357 ))
358 }
359 Err(s) => responder.send(Err(s.into_raw())),
360 };
361 }
362 });
363 return Poll::Ready(Some(Ok(CodecRequest::SetFormat { format, responder })));
364 }
365 WatchPlugState { responder } => {
366 if let Err(_e) = self.plug_state_subscriber.register(responder) {
367 self.terminated = TerminatedState::Terminating;
369 return Poll::Ready(Some(Err(Error::PeerError(
370 "WatchPlugState while hanging".to_string(),
371 ))));
372 }
373 }
374 }
375 }
376 }
377
378 fn poll_signal(&mut self, _cx: &mut Context<'_>) -> Poll<Option<Result<CodecRequest>>> {
381 let Some(_requests) = self.signal_processing_requests.as_mut() else {
382 return Poll::Pending;
384 };
385 Poll::Pending
387 }
388}
389
390impl FusedStream for SoftCodec {
391 fn is_terminated(&self) -> bool {
392 self.terminated == TerminatedState::Terminated
393 }
394}
395
396impl Stream for SoftCodec {
397 type Item = Result<CodecRequest>;
398
399 fn poll_next(
400 mut self: std::pin::Pin<&mut Self>,
401 cx: &mut Context<'_>,
402 ) -> Poll<Option<Self::Item>> {
403 match self.terminated {
404 TerminatedState::Terminating => {
405 self.terminated = TerminatedState::Terminated;
406 self.codec_requests = None;
407 return Poll::Ready(None);
408 }
409 TerminatedState::Terminated => panic!("polled while terminated"),
410 TerminatedState::NotTerminated => {}
411 };
412 if let Poll::Ready(x) = self.poll_codec(cx) {
413 return Poll::Ready(x);
414 }
415 self.poll_signal(cx)
416 }
417}
418
419#[cfg(test)]
420pub(crate) mod tests {
421 use super::*;
422
423 use async_utils::PollExt;
424 use fixture::fixture;
425
426 use fidl_fuchsia_hardware_audio::{
427 CodecProxy, DaiFrameFormat, DaiFrameFormatStandard, DaiSampleFormat, DaiSupportedFormats,
428 };
429
430 const TEST_UNIQUE_ID: [u8; 16] = [5; 16];
431 const TEST_MANUF: &'static str = "Fuchsia";
432 const TEST_PRODUCT: &'static str = "Test Codec";
433
434 pub(crate) fn with_soft_codec<F>(_name: &str, test: F)
435 where
436 F: FnOnce(fasync::TestExecutor, CodecProxy, SoftCodec) -> (),
437 {
438 let exec = fasync::TestExecutor::new();
439 let (codec, client) = SoftCodec::create(
440 Some(&TEST_UNIQUE_ID),
441 TEST_MANUF,
442 TEST_PRODUCT,
443 CodecDirection::Output,
444 DaiSupportedFormats {
445 number_of_channels: vec![1, 2],
446 sample_formats: vec![DaiSampleFormat::PcmUnsigned],
447 frame_formats: vec![DaiFrameFormat::FrameFormatStandard(
449 DaiFrameFormatStandard::I2S,
450 )],
451 frame_rates: vec![48000],
452 bits_per_slot: vec![16],
453 bits_per_sample: vec![16],
454 },
455 true,
456 );
457 test(exec, client.into_proxy(), codec)
458 }
459
460 #[fixture(with_soft_codec)]
461 #[fuchsia::test]
462 fn happy_lifecycle(mut exec: fasync::TestExecutor, proxy: CodecProxy, mut codec: SoftCodec) {
463 let mut codec_next_fut = codec.next();
464 let mut get_properties_fut = proxy.get_properties();
465 exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
467 let properties =
468 exec.run_until_stalled(&mut get_properties_fut).expect("finish").expect("ok");
469
470 assert_eq!(properties.unique_id, Some(TEST_UNIQUE_ID));
471 assert_eq!(
472 properties.plug_detect_capabilities,
473 Some(fidl_fuchsia_hardware_audio::PlugDetectCapabilities::CanAsyncNotify)
474 );
475
476 let mut formats_fut = proxy.get_dai_formats();
477 exec.run_until_stalled(&mut formats_fut)
479 .expect_pending("shouldn't finish until codec polled");
480
481 exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
483
484 let formats =
486 exec.run_singlethreaded(&mut formats_fut).expect("fidl succeed").expect("format ok");
487 let Some(formats) = formats.first() else {
488 panic!("Expected at least one format");
489 };
490 assert_eq!(formats.number_of_channels.len(), 2);
491 assert_eq!(formats.frame_rates[0], 48000);
492
493 let mut set_format_fut = proxy.set_dai_format(&fidl_fuchsia_hardware_audio::DaiFormat {
494 number_of_channels: 2,
495 channels_to_use_bitmask: 0x3,
496 sample_format: DaiSampleFormat::PcmUnsigned,
497 frame_format: DaiFrameFormat::FrameFormatStandard(DaiFrameFormatStandard::I2S),
498 frame_rate: 48000,
499 bits_per_slot: 16,
500 bits_per_sample: 16,
501 });
502
503 exec.run_until_stalled(&mut set_format_fut).expect_pending("should pend");
505
506 let Some(Ok(CodecRequest::SetFormat { format, responder })) =
509 exec.run_singlethreaded(&mut codec_next_fut)
510 else {
511 panic!("Expected a SetFormat request");
512 };
513
514 assert_eq!(format.number_of_channels, 2);
515 assert_eq!(format.channels_to_use_bitmask, 0x3);
516
517 exec.run_until_stalled(&mut set_format_fut).expect_pending("should pend");
518 responder(Ok(()));
519
520 let Ok(Ok(codec_format_info)) = exec.run_singlethreaded(&mut set_format_fut) else {
521 panic!("Expected ok response");
522 };
523
524 assert_eq!(codec_format_info, fidl_fuchsia_hardware_audio::CodecFormatInfo::default());
525
526 let mut codec_next_fut = codec.next();
527 exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
528
529 let mut start_fut = proxy.start();
530 exec.run_until_stalled(&mut start_fut).expect_pending("should pend");
531
532 let Some(Ok(CodecRequest::Start { responder })) =
533 exec.run_singlethreaded(&mut codec_next_fut)
534 else {
535 panic!("Expected a Start request");
536 };
537 let mut codec_next_fut = codec.next();
538
539 exec.run_until_stalled(&mut start_fut).expect_pending("should pend");
540
541 let started_time = fasync::MonotonicInstant::now();
542 responder(Ok(started_time.into()));
543
544 let time = exec.run_until_stalled(&mut start_fut).expect("should be started").unwrap();
545
546 assert_eq!(started_time.into_nanos(), time);
547
548 let mut start_fut = proxy.start();
550 exec.run_until_stalled(&mut start_fut).expect_pending("should pend");
551 exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
552
553 let time = exec.run_until_stalled(&mut start_fut).expect("should be started").unwrap();
554 assert_eq!(started_time.into_nanos(), time);
555
556 let mut watch_plug_state_fut = proxy.watch_plug_state();
557 exec.run_until_stalled(&mut watch_plug_state_fut).expect_pending("should pend");
558 exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
559 let plug_state =
560 exec.run_until_stalled(&mut watch_plug_state_fut).expect("should finish").expect("ok");
561
562 assert_eq!(plug_state.plugged, Some(true));
563 assert!(
564 plug_state.plug_state_time.unwrap() <= fasync::MonotonicInstant::now().into_nanos()
565 );
566
567 let mut watch_plug_state_fut = proxy.watch_plug_state();
569 exec.run_until_stalled(&mut watch_plug_state_fut).expect_pending("should pend");
570 exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
571 exec.run_until_stalled(&mut watch_plug_state_fut).expect_pending("should pend");
572
573 drop(codec_next_fut);
574
575 codec.update_plug_state(false).unwrap();
577
578 let plug_state =
579 exec.run_until_stalled(&mut watch_plug_state_fut).expect("done").expect("ok");
580
581 assert_eq!(plug_state.plugged, Some(false));
582
583 let mut codec_next_fut = codec.next();
584
585 exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
586
587 let mut health_fut = proxy.get_health_state();
589 exec.run_until_stalled(&mut health_fut).expect_pending("should pend");
590 exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
591 let _health = exec.run_until_stalled(&mut health_fut).expect("ready").expect("ok");
592
593 let mut stop_fut = proxy.stop();
594 let Poll::Ready(Some(Ok(CodecRequest::Stop { responder }))) =
595 exec.run_until_stalled(&mut codec_next_fut)
596 else {
597 panic!("Expected a codec request to stop");
598 };
599 exec.run_until_stalled(&mut stop_fut).expect_pending("should pend");
600
601 let response_time = fasync::MonotonicInstant::now();
602 responder(Ok(response_time.into()));
603
604 let Poll::Ready(Ok(received_time)) = exec.run_until_stalled(&mut stop_fut) else {
605 panic!("Expected stop to finish");
606 };
607
608 assert_eq!(received_time, response_time.into_nanos());
609 }
610
611 #[fuchsia::test]
612 async fn started_twice_before_response() {
613 let (mut codec, client) = SoftCodec::create(
614 Some(&TEST_UNIQUE_ID),
615 TEST_MANUF,
616 TEST_PRODUCT,
617 CodecDirection::Output,
618 DaiSupportedFormats {
619 number_of_channels: vec![1, 2],
620 sample_formats: vec![DaiSampleFormat::PcmUnsigned],
621 frame_formats: vec![DaiFrameFormat::FrameFormatStandard(
623 DaiFrameFormatStandard::I2S,
624 )],
625 frame_rates: vec![48000],
626 bits_per_slot: vec![16],
627 bits_per_sample: vec![16],
628 },
629 true,
630 );
631 let proxy = client.into_proxy();
632
633 let set_format_fut = proxy.set_dai_format(&fidl_fuchsia_hardware_audio::DaiFormat {
634 number_of_channels: 2,
635 channels_to_use_bitmask: 0x3,
636 sample_format: DaiSampleFormat::PcmUnsigned,
637 frame_format: DaiFrameFormat::FrameFormatStandard(DaiFrameFormatStandard::I2S),
638 frame_rate: 48000,
639 bits_per_slot: 16,
640 bits_per_sample: 16,
641 });
642
643 let Some(Ok(CodecRequest::SetFormat { format, responder })) = codec.next().await else {
646 panic!("Expected a SetFormat request");
647 };
648
649 assert_eq!(format.number_of_channels, 2);
650 assert_eq!(format.channels_to_use_bitmask, 0x3);
651
652 responder(Ok(()));
653
654 let Ok(Ok(codec_format_info)) = set_format_fut.await else {
655 panic!("Expeted ok response");
656 };
657
658 assert_eq!(codec_format_info, fidl_fuchsia_hardware_audio::CodecFormatInfo::default());
659
660 let start_fut = proxy.start();
661
662 let Some(Ok(CodecRequest::Start { responder })) = codec.next().await else {
663 panic!("Expected a Start request");
664 };
665
666 let start_before_response_fut = proxy.start();
667 let codec_next = codec.next().await;
669 let Some(Err(_)) = codec_next else {
670 panic!("Expected stream to close on bad behavior from proxy: {codec_next:?}");
671 };
672 let codec_next = codec.next().await;
673 let None = codec_next else {
674 panic!("Expected stream to terminate after: {codec_next:?}");
675 };
676 let start_before_response = start_before_response_fut.await;
678 let Err(_) = start_before_response else {
679 panic!("Expected error from the second start on the proxy before a response: {start_before_response:?}");
680 };
681
682 let start_response = start_fut.await;
684 let Err(_) = start_response else {
685 panic!("Expected error from the first start on the proxy before a response: {start_response:?}");
686 };
687 let response_time = fasync::MonotonicInstant::now();
689 responder(Ok(response_time.into()));
690 }
691}