1use async_utils::hanging_get::server as hanging_server;
6use fidl::endpoints::{ClientEnd, ControlHandle, Responder};
7use fidl_fuchsia_hardware_audio::{
8 CodecMarker, CodecRequestStream, CodecStartResponder, CodecStopResponder,
9 CodecWatchPlugStateResponder, PlugState,
10};
11use fidl_fuchsia_hardware_audio_signalprocessing::SignalProcessingRequestStream;
12use fuchsia_async as fasync;
13use fuchsia_sync::Mutex;
14use futures::stream::FusedStream;
15use futures::task::{Context, Poll};
16use futures::{Stream, StreamExt};
17use std::fmt::Debug;
18use std::sync::Arc;
19
20use crate::types::{Error, Result};
21
22pub struct SoftCodec {
27 codec_requests: Option<CodecRequestStream>,
29 signal_processing_requests: Option<SignalProcessingRequestStream>,
31 properties: fidl_fuchsia_hardware_audio::CodecProperties,
33 supported_formats: [fidl_fuchsia_hardware_audio::DaiSupportedFormats; 1],
36 selected_format: Arc<Mutex<Option<fidl_fuchsia_hardware_audio::DaiFormat>>>,
38 start_state: Arc<Mutex<StartState>>,
41 plug_state_publisher: hanging_server::Publisher<
43 PlugState,
44 CodecWatchPlugStateResponder,
45 Box<dyn Fn(&PlugState, CodecWatchPlugStateResponder) -> bool>,
46 >,
47 plug_state_subscriber: hanging_server::Subscriber<
49 PlugState,
50 CodecWatchPlugStateResponder,
51 Box<dyn Fn(&PlugState, CodecWatchPlugStateResponder) -> bool>,
52 >,
53 terminated: TerminatedState,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Default)]
58enum TerminatedState {
59 #[default]
60 NotTerminated,
61 Terminating,
63 Terminated,
65}
66
67#[derive(Debug)]
68pub enum StartState {
69 Stopped(zx::MonotonicInstant),
70 Starting(Option<CodecStartResponder>),
71 Started(zx::MonotonicInstant),
72 Stopping(Option<CodecStopResponder>),
73}
74
75impl Default for StartState {
76 fn default() -> Self {
77 Self::Stopped(fasync::MonotonicInstant::now().into())
78 }
79}
80
81pub enum CodecDirection {
82 Input,
83 Output,
84 Duplex,
85}
86
87pub enum CodecRequest {
89 SetFormat {
91 format: fidl_fuchsia_hardware_audio::DaiFormat,
93 responder: Box<dyn FnOnce(std::result::Result<(), zx::Status>) + Send>,
96 },
97 Start {
99 responder: Box<dyn FnOnce(std::result::Result<zx::MonotonicInstant, zx::Status>) + Send>,
105 },
106 Stop {
108 responder: Box<dyn FnOnce(std::result::Result<zx::MonotonicInstant, zx::Status>) + Send>,
112 },
113}
114
115impl Debug for CodecRequest {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 match self {
118 Self::SetFormat { format, .. } => {
119 f.debug_struct("SetFormat").field("format", format).finish()
120 }
121 Self::Start { .. } => f.debug_struct("Start").finish(),
122 Self::Stop { .. } => f.debug_struct("Stop").finish(),
123 }
124 }
125}
126
127impl CodecRequest {
128 fn start(state: Arc<Mutex<StartState>>) -> Self {
129 Self::Start {
130 responder: Box::new(move |time| {
131 let mut lock = state.lock();
132 let StartState::Starting(ref mut responder) = *lock else {
133 log::warn!("Not in starting state, noop response");
134 return;
135 };
136 let Some(responder) = responder.take() else {
137 log::warn!("Responder missing for start, noop");
138 return;
139 };
140 let Ok(time) = time else {
141 drop(responder);
143 return;
144 };
145 *lock = StartState::Started(time);
146 let _ = responder.send(time.into_nanos());
147 }),
148 }
149 }
150
151 fn stop(state: Arc<Mutex<StartState>>) -> Self {
152 Self::Stop {
153 responder: Box::new(move |time| {
154 let mut lock = state.lock();
155 let StartState::Stopping(ref mut responder) = *lock else {
156 log::warn!("Not in stopping state, noop response");
157 return;
158 };
159 let Some(responder) = responder.take() else {
160 log::warn!("Responder missing for stop, noop");
161 return;
162 };
163 let Ok(time) = time else {
164 drop(responder);
166 return;
167 };
168 *lock = StartState::Stopped(time);
169 let _ = responder.send(time.into_nanos());
170 }),
171 }
172 }
173}
174
175impl SoftCodec {
176 pub fn create(
177 unique_id: Option<&[u8; 16]>,
178 manufacturer: &str,
179 product: &str,
180 direction: CodecDirection,
181 formats: fidl_fuchsia_hardware_audio::DaiSupportedFormats,
182 initially_plugged: bool,
183 ) -> (Self, ClientEnd<CodecMarker>) {
184 let (client, codec_requests) = fidl::endpoints::create_request_stream::<CodecMarker>();
185 let is_input = match direction {
186 CodecDirection::Input => Some(true),
187 CodecDirection::Output => Some(false),
188 CodecDirection::Duplex => None,
189 };
190 let properties = fidl_fuchsia_hardware_audio::CodecProperties {
191 is_input,
192 manufacturer: Some(manufacturer.to_string()),
193 product: Some(product.to_string()),
194 unique_id: unique_id.cloned(),
195 plug_detect_capabilities: Some(
196 fidl_fuchsia_hardware_audio::PlugDetectCapabilities::CanAsyncNotify,
197 ),
198 ..Default::default()
199 };
200 let plug_state = fidl_fuchsia_hardware_audio::PlugState {
201 plugged: Some(initially_plugged),
202 plug_state_time: Some(fuchsia_async::MonotonicInstant::now().into_nanos()),
203 ..Default::default()
204 };
205 let mut plug_state_server = hanging_server::HangingGet::<
206 _,
207 _,
208 Box<dyn Fn(&PlugState, CodecWatchPlugStateResponder) -> bool>,
209 >::new(
210 plug_state,
211 Box::new(|state, responder: CodecWatchPlugStateResponder| {
212 let _ = responder.send(state);
213 true
214 }),
215 );
216 let plug_state_publisher = plug_state_server.new_publisher();
217 let plug_state_subscriber = plug_state_server.new_subscriber();
218 (
219 Self {
220 codec_requests: Some(codec_requests),
221 signal_processing_requests: Default::default(),
222 properties,
223 supported_formats: [formats],
224 selected_format: Default::default(),
225 start_state: Default::default(),
226 plug_state_publisher,
227 plug_state_subscriber,
228 terminated: Default::default(),
229 },
230 client,
231 )
232 }
233
234 pub fn update_plug_state(&self, plugged: bool) -> Result<()> {
235 self.plug_state_publisher.set(fidl_fuchsia_hardware_audio::PlugState {
236 plugged: Some(plugged),
237 plug_state_time: Some(fasync::MonotonicInstant::now().into_nanos()),
238 ..Default::default()
239 });
240 Ok(())
241 }
242
243 fn poll_codec(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<CodecRequest>>> {
246 let Some(codec_requests) = self.codec_requests.as_mut() else {
247 panic!("Codec requests polled without a request stream");
248 };
249 loop {
250 let request = futures::ready!(codec_requests.poll_next_unpin(cx));
251 let request = match request {
252 None => {
253 self.terminated = TerminatedState::Terminating;
254 return Poll::Ready(Some(Err(Error::RequestStreamError(
255 fidl::Error::ClientRead(zx::Status::PEER_CLOSED.into()),
256 ))));
257 }
258 Some(Err(e)) => {
259 self.terminated = TerminatedState::Terminating;
260 return Poll::Ready(Some(Err(Error::RequestStreamError(e))));
261 }
262 Some(Ok(request)) => request,
263 };
264 use fidl_fuchsia_hardware_audio::CodecRequest::*;
265 log::info!("Handling codec request: {request:?}");
266 match request {
267 GetHealthState { responder } => {
268 let _ = responder.send(&fidl_fuchsia_hardware_audio::HealthState::default());
269 }
270 SignalProcessingConnect { protocol, .. } => {
271 if self.signal_processing_requests.is_some() {
272 let _ = protocol.close_with_epitaph(zx::Status::ALREADY_BOUND);
273 continue;
274 }
275 let _ = protocol.close_with_epitaph(zx::Status::NOT_SUPPORTED);
277 }
278 Reset { responder } => {
279 responder.control_handle().shutdown_with_epitaph(zx::Status::NOT_SUPPORTED);
281 continue;
282 }
283 GetProperties { responder } => {
284 let _ = responder.send(&self.properties);
285 }
286 Start { responder } => {
287 let mut lock = self.start_state.lock();
288 match *lock {
289 StartState::Started(time) => {
290 let _ = responder.send(time.into_nanos());
291 continue;
292 }
293 StartState::Stopped(_) => {
294 *lock = StartState::Starting(Some(responder));
295 return Poll::Ready(Some(Ok(CodecRequest::start(
296 self.start_state.clone(),
297 ))));
298 }
299 StartState::Starting(ref mut existing_responder) => {
300 log::warn!("Got start while starting, closing codec");
302 drop(responder);
303 drop(existing_responder.take());
304 }
305 _ => {
306 log::warn!("Got start while stopping, closing codec");
308 drop(responder);
309 }
310 }
311 }
312 Stop { responder } => {
313 let mut lock = self.start_state.lock();
314 match *lock {
315 StartState::Stopped(time) => {
316 let _ = responder.send(time.into_nanos());
317 continue;
318 }
319 StartState::Started(_) => {
320 *lock = StartState::Stopping(Some(responder));
321 return Poll::Ready(Some(Ok(CodecRequest::stop(
322 self.start_state.clone(),
323 ))));
324 }
325 StartState::Stopping(ref mut existing_responder) => {
326 log::warn!("Got stop while stopping, closing codec");
328 drop(responder);
329 drop(existing_responder.take());
330 }
331 _ => {
332 log::warn!("Got stop while starting, closing codec");
334 drop(responder);
335 }
336 }
337 }
338 GetDaiFormats { responder } => {
339 let _ = responder.send(Ok(self.supported_formats.as_slice()));
340 }
341 SetDaiFormat { format, responder } => {
342 let responder = Box::new({
343 let selected = self.selected_format.clone();
344 let format = format.clone();
345 move |result: std::result::Result<(), zx::Status>| {
346 let _ = match result {
347 Ok(()) => {
349 *selected.lock() = Some(format);
350 responder.send(Ok(
351 &fidl_fuchsia_hardware_audio::CodecFormatInfo::default(),
352 ))
353 }
354 Err(s) => responder.send(Err(s.into_raw())),
355 };
356 }
357 });
358 return Poll::Ready(Some(Ok(CodecRequest::SetFormat { format, responder })));
359 }
360 WatchPlugState { responder } => {
361 if let Err(_e) = self.plug_state_subscriber.register(responder) {
362 self.terminated = TerminatedState::Terminating;
364 return Poll::Ready(Some(Err(Error::PeerError(
365 "WatchPlugState while hanging".to_string(),
366 ))));
367 }
368 }
369 }
370 }
371 }
372
373 fn poll_signal(&mut self, _cx: &mut Context<'_>) -> Poll<Option<Result<CodecRequest>>> {
376 let Some(_requests) = self.signal_processing_requests.as_mut() else {
377 return Poll::Pending;
379 };
380 Poll::Pending
382 }
383}
384
385impl FusedStream for SoftCodec {
386 fn is_terminated(&self) -> bool {
387 self.terminated == TerminatedState::Terminated
388 }
389}
390
391impl Stream for SoftCodec {
392 type Item = Result<CodecRequest>;
393
394 fn poll_next(
395 mut self: std::pin::Pin<&mut Self>,
396 cx: &mut Context<'_>,
397 ) -> Poll<Option<Self::Item>> {
398 match self.terminated {
399 TerminatedState::Terminating => {
400 self.terminated = TerminatedState::Terminated;
401 self.codec_requests = None;
402 return Poll::Ready(None);
403 }
404 TerminatedState::Terminated => panic!("polled while terminated"),
405 TerminatedState::NotTerminated => {}
406 };
407 if let Poll::Ready(x) = self.poll_codec(cx) {
408 return Poll::Ready(x);
409 }
410 self.poll_signal(cx)
411 }
412}
413
414#[cfg(test)]
415pub(crate) mod tests {
416 use super::*;
417
418 use async_utils::PollExt;
419 use fixture::fixture;
420
421 use fidl_fuchsia_hardware_audio::{
422 CodecProxy, DaiFrameFormat, DaiFrameFormatStandard, DaiSampleFormat, DaiSupportedFormats,
423 };
424
425 const TEST_UNIQUE_ID: [u8; 16] = [5; 16];
426 const TEST_MANUF: &'static str = "Fuchsia";
427 const TEST_PRODUCT: &'static str = "Test Codec";
428
429 pub(crate) fn with_soft_codec<F>(_name: &str, test: F)
430 where
431 F: FnOnce(fasync::TestExecutor, CodecProxy, SoftCodec) -> (),
432 {
433 let exec = fasync::TestExecutor::new();
434 let (codec, client) = SoftCodec::create(
435 Some(&TEST_UNIQUE_ID),
436 TEST_MANUF,
437 TEST_PRODUCT,
438 CodecDirection::Output,
439 DaiSupportedFormats {
440 number_of_channels: vec![1, 2],
441 sample_formats: vec![DaiSampleFormat::PcmUnsigned],
442 frame_formats: vec![DaiFrameFormat::FrameFormatStandard(
444 DaiFrameFormatStandard::I2S,
445 )],
446 frame_rates: vec![48000],
447 bits_per_slot: vec![16],
448 bits_per_sample: vec![16],
449 },
450 true,
451 );
452 test(exec, client.into_proxy(), codec)
453 }
454
455 #[fixture(with_soft_codec)]
456 #[fuchsia::test]
457 fn happy_lifecycle(mut exec: fasync::TestExecutor, proxy: CodecProxy, mut codec: SoftCodec) {
458 let mut codec_next_fut = codec.next();
459 let mut get_properties_fut = proxy.get_properties();
460 exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
462 let properties =
463 exec.run_until_stalled(&mut get_properties_fut).expect("finish").expect("ok");
464
465 assert_eq!(properties.unique_id, Some(TEST_UNIQUE_ID));
466 assert_eq!(
467 properties.plug_detect_capabilities,
468 Some(fidl_fuchsia_hardware_audio::PlugDetectCapabilities::CanAsyncNotify)
469 );
470
471 let mut formats_fut = proxy.get_dai_formats();
472 exec.run_until_stalled(&mut formats_fut)
474 .expect_pending("shouldn't finish until codec polled");
475
476 exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
478
479 let formats =
481 exec.run_singlethreaded(&mut formats_fut).expect("fidl succeed").expect("format ok");
482 let Some(formats) = formats.first() else {
483 panic!("Expected at least one format");
484 };
485 assert_eq!(formats.number_of_channels.len(), 2);
486 assert_eq!(formats.frame_rates[0], 48000);
487
488 let mut set_format_fut = proxy.set_dai_format(&fidl_fuchsia_hardware_audio::DaiFormat {
489 number_of_channels: 2,
490 channels_to_use_bitmask: 0x3,
491 sample_format: DaiSampleFormat::PcmUnsigned,
492 frame_format: DaiFrameFormat::FrameFormatStandard(DaiFrameFormatStandard::I2S),
493 frame_rate: 48000,
494 bits_per_slot: 16,
495 bits_per_sample: 16,
496 });
497
498 exec.run_until_stalled(&mut set_format_fut).expect_pending("should pend");
500
501 let Some(Ok(CodecRequest::SetFormat { format, responder })) =
504 exec.run_singlethreaded(&mut codec_next_fut)
505 else {
506 panic!("Expected a SetFormat request");
507 };
508
509 assert_eq!(format.number_of_channels, 2);
510 assert_eq!(format.channels_to_use_bitmask, 0x3);
511
512 exec.run_until_stalled(&mut set_format_fut).expect_pending("should pend");
513 responder(Ok(()));
514
515 let Ok(Ok(codec_format_info)) = exec.run_singlethreaded(&mut set_format_fut) else {
516 panic!("Expected ok response");
517 };
518
519 assert_eq!(codec_format_info, fidl_fuchsia_hardware_audio::CodecFormatInfo::default());
520
521 let mut codec_next_fut = codec.next();
522 exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
523
524 let mut start_fut = proxy.start();
525 exec.run_until_stalled(&mut start_fut).expect_pending("should pend");
526
527 let Some(Ok(CodecRequest::Start { responder })) =
528 exec.run_singlethreaded(&mut codec_next_fut)
529 else {
530 panic!("Expected a Start request");
531 };
532 let mut codec_next_fut = codec.next();
533
534 exec.run_until_stalled(&mut start_fut).expect_pending("should pend");
535
536 let started_time = fasync::MonotonicInstant::now();
537 responder(Ok(started_time.into()));
538
539 let time = exec.run_until_stalled(&mut start_fut).expect("should be started").unwrap();
540
541 assert_eq!(started_time.into_nanos(), time);
542
543 let mut start_fut = proxy.start();
545 exec.run_until_stalled(&mut start_fut).expect_pending("should pend");
546 exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
547
548 let time = exec.run_until_stalled(&mut start_fut).expect("should be started").unwrap();
549 assert_eq!(started_time.into_nanos(), time);
550
551 let mut watch_plug_state_fut = proxy.watch_plug_state();
552 exec.run_until_stalled(&mut watch_plug_state_fut).expect_pending("should pend");
553 exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
554 let plug_state =
555 exec.run_until_stalled(&mut watch_plug_state_fut).expect("should finish").expect("ok");
556
557 assert_eq!(plug_state.plugged, Some(true));
558 assert!(
559 plug_state.plug_state_time.unwrap() <= fasync::MonotonicInstant::now().into_nanos()
560 );
561
562 let mut watch_plug_state_fut = proxy.watch_plug_state();
564 exec.run_until_stalled(&mut watch_plug_state_fut).expect_pending("should pend");
565 exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
566 exec.run_until_stalled(&mut watch_plug_state_fut).expect_pending("should pend");
567
568 drop(codec_next_fut);
569
570 codec.update_plug_state(false).unwrap();
572
573 let plug_state =
574 exec.run_until_stalled(&mut watch_plug_state_fut).expect("done").expect("ok");
575
576 assert_eq!(plug_state.plugged, Some(false));
577
578 let mut codec_next_fut = codec.next();
579
580 exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
581
582 let mut health_fut = proxy.get_health_state();
584 exec.run_until_stalled(&mut health_fut).expect_pending("should pend");
585 exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
586 let _health = exec.run_until_stalled(&mut health_fut).expect("ready").expect("ok");
587
588 let mut stop_fut = proxy.stop();
589 let Poll::Ready(Some(Ok(CodecRequest::Stop { responder }))) =
590 exec.run_until_stalled(&mut codec_next_fut)
591 else {
592 panic!("Expected a codec request to stop");
593 };
594 exec.run_until_stalled(&mut stop_fut).expect_pending("should pend");
595
596 let response_time = fasync::MonotonicInstant::now();
597 responder(Ok(response_time.into()));
598
599 let Poll::Ready(Ok(received_time)) = exec.run_until_stalled(&mut stop_fut) else {
600 panic!("Expected stop to finish");
601 };
602
603 assert_eq!(received_time, response_time.into_nanos());
604 }
605
606 #[fuchsia::test]
607 async fn started_twice_before_response() {
608 let (mut codec, client) = SoftCodec::create(
609 Some(&TEST_UNIQUE_ID),
610 TEST_MANUF,
611 TEST_PRODUCT,
612 CodecDirection::Output,
613 DaiSupportedFormats {
614 number_of_channels: vec![1, 2],
615 sample_formats: vec![DaiSampleFormat::PcmUnsigned],
616 frame_formats: vec![DaiFrameFormat::FrameFormatStandard(
618 DaiFrameFormatStandard::I2S,
619 )],
620 frame_rates: vec![48000],
621 bits_per_slot: vec![16],
622 bits_per_sample: vec![16],
623 },
624 true,
625 );
626 let proxy = client.into_proxy();
627
628 let set_format_fut = proxy.set_dai_format(&fidl_fuchsia_hardware_audio::DaiFormat {
629 number_of_channels: 2,
630 channels_to_use_bitmask: 0x3,
631 sample_format: DaiSampleFormat::PcmUnsigned,
632 frame_format: DaiFrameFormat::FrameFormatStandard(DaiFrameFormatStandard::I2S),
633 frame_rate: 48000,
634 bits_per_slot: 16,
635 bits_per_sample: 16,
636 });
637
638 let Some(Ok(CodecRequest::SetFormat { format, responder })) = codec.next().await else {
641 panic!("Expected a SetFormat request");
642 };
643
644 assert_eq!(format.number_of_channels, 2);
645 assert_eq!(format.channels_to_use_bitmask, 0x3);
646
647 responder(Ok(()));
648
649 let Ok(Ok(codec_format_info)) = set_format_fut.await else {
650 panic!("Expeted ok response");
651 };
652
653 assert_eq!(codec_format_info, fidl_fuchsia_hardware_audio::CodecFormatInfo::default());
654
655 let start_fut = proxy.start();
656
657 let Some(Ok(CodecRequest::Start { responder })) = codec.next().await else {
658 panic!("Expected a Start request");
659 };
660
661 let start_before_response_fut = proxy.start();
662 let codec_next = codec.next().await;
664 let Some(Err(_)) = codec_next else {
665 panic!("Expected stream to close on bad behavior from proxy: {codec_next:?}");
666 };
667 let codec_next = codec.next().await;
668 let None = codec_next else {
669 panic!("Expected stream to terminate after: {codec_next:?}");
670 };
671 let start_before_response = start_before_response_fut.await;
673 let Err(_) = start_before_response else {
674 panic!("Expected error from the second start on the proxy before a response: {start_before_response:?}");
675 };
676
677 let start_response = start_fut.await;
679 let Err(_) = start_response else {
680 panic!("Expected error from the first start on the proxy before a response: {start_response:?}");
681 };
682 let response_time = fasync::MonotonicInstant::now();
684 responder(Ok(response_time.into()));
685 }
686}