fuchsia_audio_device/
codec.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use async_utils::hanging_get::server as hanging_server;
6use fidl::endpoints::{ClientEnd, ControlHandle, Responder};
7use fidl_fuchsia_hardware_audio::{
8    CodecMarker, CodecRequestStream, CodecStartResponder, CodecStopResponder,
9    CodecWatchPlugStateResponder, PlugState,
10};
11use fidl_fuchsia_hardware_audio_signalprocessing::SignalProcessingRequestStream;
12use fuchsia_async as fasync;
13use fuchsia_sync::Mutex;
14use futures::stream::FusedStream;
15use futures::task::{Context, Poll};
16use futures::{Stream, StreamExt};
17use std::fmt::Debug;
18use std::sync::Arc;
19
20use crate::types::{Error, Result};
21
22/// A software fuchsia audio codec.
23/// Codecs communicate their capabilities to the system and provide control points for components
24/// to start, stop, and configure media capabilities.
25/// See [[Audio Codec Interface]](https://fuchsia.dev/fuchsia-src/concepts/drivers/driver_architectures/audio_drivers/audio_codec)
26pub struct SoftCodec {
27    /// Requests from Media for Codec control (start/stop)
28    codec_requests: Option<CodecRequestStream>,
29    /// Requests from Media if the SignalProcessing Connector is connected
30    signal_processing_requests: Option<SignalProcessingRequestStream>,
31    /// CodecProperties of this device (do not change)
32    properties: fidl_fuchsia_hardware_audio::CodecProperties,
33    /// Formats that are supported by this codec.  These do not change for the lifetime of the
34    /// codec.
35    supported_formats: [fidl_fuchsia_hardware_audio::DaiSupportedFormats; 1],
36    /// Selected format, if it is selected
37    selected_format: Arc<Mutex<Option<fidl_fuchsia_hardware_audio::DaiFormat>>>,
38    /// Codec Start state, including time started/stopped.
39    /// Used to correctly respond to Start / Stop when already started/stopped
40    start_state: Arc<Mutex<StartState>>,
41    /// Plug State publisher
42    plug_state_publisher: hanging_server::Publisher<
43        PlugState,
44        CodecWatchPlugStateResponder,
45        Box<dyn Fn(&PlugState, CodecWatchPlugStateResponder) -> bool>,
46    >,
47    /// Plug State subscriber
48    plug_state_subscriber: hanging_server::Subscriber<
49        PlugState,
50        CodecWatchPlugStateResponder,
51        Box<dyn Fn(&PlugState, CodecWatchPlugStateResponder) -> bool>,
52    >,
53    /// True when the codec has terminated, and should not be polled any more.
54    terminated: TerminatedState,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Default)]
58enum TerminatedState {
59    #[default]
60    NotTerminated,
61    /// Terminating. We have returned an Err, but not a None yet.
62    Terminating,
63    /// We have returned a None, no one should poll us again.
64    Terminated,
65}
66
67#[derive(Debug)]
68pub enum StartState {
69    Stopped(zx::MonotonicInstant),
70    Starting(Option<CodecStartResponder>),
71    Started(zx::MonotonicInstant),
72    Stopping(Option<CodecStopResponder>),
73}
74
75impl Default for StartState {
76    fn default() -> Self {
77        Self::Stopped(fasync::MonotonicInstant::now().into())
78    }
79}
80
81pub enum CodecDirection {
82    Input,
83    Output,
84    Duplex,
85}
86
87/// Request from the Audio subsystem to query or perform an action on the codec.
88pub enum CodecRequest {
89    /// Set the format of this codec.
90    SetFormat {
91        /// The format requested.
92        format: fidl_fuchsia_hardware_audio::DaiFormat,
93        /// Responder to be called when the format has been set with Ok(()) or Err(Status) if
94        /// setting the format failed.
95        responder: Box<dyn FnOnce(std::result::Result<(), zx::Status>) + Send>,
96    },
97    /// Start the codec.
98    Start {
99        /// Responder to be called when the codec has been started, providing the time started or
100        /// an error if it failed to start.
101        /// The time is system monotonic when configuring the codec to start completed (it does not
102        /// include startup delay time)
103        /// Replying Err() to this will close the codec, and it will need to be re-instantiated.
104        responder: Box<dyn FnOnce(std::result::Result<zx::MonotonicInstant, zx::Status>) + Send>,
105    },
106    /// Stop the codec.
107    Stop {
108        /// Responder to be called when the codec has been configured to stop.
109        /// On success, provides the time that the codec was configured, not including delays.
110        /// On failuyre, this will close the codec and it will need to be re-instantiated.
111        responder: Box<dyn FnOnce(std::result::Result<zx::MonotonicInstant, zx::Status>) + Send>,
112    },
113}
114
115impl Debug for CodecRequest {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        match self {
118            Self::SetFormat { format, .. } => {
119                f.debug_struct("SetFormat").field("format", format).finish()
120            }
121            Self::Start { .. } => f.debug_struct("Start").finish(),
122            Self::Stop { .. } => f.debug_struct("Stop").finish(),
123        }
124    }
125}
126
127impl CodecRequest {
128    fn start(state: Arc<Mutex<StartState>>) -> Self {
129        Self::Start {
130            responder: Box::new(move |time| {
131                let mut lock = state.lock();
132                let StartState::Starting(ref mut responder) = *lock else {
133                    log::warn!("Not in starting state, noop response");
134                    return;
135                };
136                let Some(responder) = responder.take() else {
137                    log::warn!("Responder missing for start, noop");
138                    return;
139                };
140                let Ok(time) = time else {
141                    // Some error happened here, we close the channel by dropping the responder.
142                    drop(responder);
143                    return;
144                };
145                *lock = StartState::Started(time);
146                let _ = responder.send(time.into_nanos());
147            }),
148        }
149    }
150
151    fn stop(state: Arc<Mutex<StartState>>) -> Self {
152        Self::Stop {
153            responder: Box::new(move |time| {
154                let mut lock = state.lock();
155                let StartState::Stopping(ref mut responder) = *lock else {
156                    log::warn!("Not in stopping state, noop response");
157                    return;
158                };
159                let Some(responder) = responder.take() else {
160                    log::warn!("Responder missing for stop, noop");
161                    return;
162                };
163                let Ok(time) = time else {
164                    // Some error happened here, we close the channel by dropping the responder.
165                    drop(responder);
166                    return;
167                };
168                *lock = StartState::Stopped(time);
169                let _ = responder.send(time.into_nanos());
170            }),
171        }
172    }
173}
174
175impl SoftCodec {
176    pub fn create(
177        unique_id: Option<&[u8; 16]>,
178        manufacturer: &str,
179        product: &str,
180        direction: CodecDirection,
181        formats: fidl_fuchsia_hardware_audio::DaiSupportedFormats,
182        initially_plugged: bool,
183    ) -> (Self, ClientEnd<CodecMarker>) {
184        let (client, codec_requests) = fidl::endpoints::create_request_stream::<CodecMarker>();
185        let is_input = match direction {
186            CodecDirection::Input => Some(true),
187            CodecDirection::Output => Some(false),
188            CodecDirection::Duplex => None,
189        };
190        let properties = fidl_fuchsia_hardware_audio::CodecProperties {
191            is_input,
192            manufacturer: Some(manufacturer.to_string()),
193            product: Some(product.to_string()),
194            unique_id: unique_id.cloned(),
195            plug_detect_capabilities: Some(
196                fidl_fuchsia_hardware_audio::PlugDetectCapabilities::CanAsyncNotify,
197            ),
198            ..Default::default()
199        };
200        let plug_state = fidl_fuchsia_hardware_audio::PlugState {
201            plugged: Some(initially_plugged),
202            plug_state_time: Some(fuchsia_async::MonotonicInstant::now().into_nanos()),
203            ..Default::default()
204        };
205        let mut plug_state_server = hanging_server::HangingGet::<
206            _,
207            _,
208            Box<dyn Fn(&PlugState, CodecWatchPlugStateResponder) -> bool>,
209        >::new(
210            plug_state,
211            Box::new(|state, responder: CodecWatchPlugStateResponder| {
212                let _ = responder.send(state);
213                true
214            }),
215        );
216        let plug_state_publisher = plug_state_server.new_publisher();
217        let plug_state_subscriber = plug_state_server.new_subscriber();
218        (
219            Self {
220                codec_requests: Some(codec_requests),
221                signal_processing_requests: Default::default(),
222                properties,
223                supported_formats: [formats],
224                selected_format: Default::default(),
225                start_state: Default::default(),
226                plug_state_publisher,
227                plug_state_subscriber,
228                terminated: Default::default(),
229            },
230            client,
231        )
232    }
233
234    pub fn update_plug_state(&self, plugged: bool) -> Result<()> {
235        self.plug_state_publisher.set(fidl_fuchsia_hardware_audio::PlugState {
236            plugged: Some(plugged),
237            plug_state_time: Some(fasync::MonotonicInstant::now().into_nanos()),
238            ..Default::default()
239        });
240        Ok(())
241    }
242
243    /// Polls the Codec requests, and handles any requests it can, or returns Poll::Ready if the
244    /// request requires a response, generates an event, or an error occurs.
245    fn poll_codec(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<CodecRequest>>> {
246        let Some(codec_requests) = self.codec_requests.as_mut() else {
247            panic!("Codec requests polled without a request stream");
248        };
249        loop {
250            let request = futures::ready!(codec_requests.poll_next_unpin(cx));
251            let request = match request {
252                None => {
253                    self.terminated = TerminatedState::Terminating;
254                    return Poll::Ready(Some(Err(Error::RequestStreamError(
255                        fidl::Error::ClientRead(zx::Status::PEER_CLOSED.into()),
256                    ))));
257                }
258                Some(Err(e)) => {
259                    self.terminated = TerminatedState::Terminating;
260                    return Poll::Ready(Some(Err(Error::RequestStreamError(e))));
261                }
262                Some(Ok(request)) => request,
263            };
264            use fidl_fuchsia_hardware_audio::CodecRequest::*;
265            log::info!("Handling codec request: {request:?}");
266            match request {
267                GetHealthState { responder } => {
268                    let _ = responder.send(&fidl_fuchsia_hardware_audio::HealthState::default());
269                }
270                SignalProcessingConnect { protocol, .. } => {
271                    if self.signal_processing_requests.is_some() {
272                        let _ = protocol.close_with_epitaph(zx::Status::ALREADY_BOUND);
273                        continue;
274                    }
275                    // TODO(https://fxbug.dev/323576893): Support the Signal Processing connect.
276                    let _ = protocol.close_with_epitaph(zx::Status::NOT_SUPPORTED);
277                }
278                IsBridgeable { responder } => {
279                    let _ = responder.send(false);
280                }
281                // Not supported, deprecated.  Ignore.
282                SetBridgedMode { .. } => {}
283                Reset { responder } => {
284                    // TODO(https://fxbug.dev/324247020): Support resetting.
285                    responder.control_handle().shutdown_with_epitaph(zx::Status::NOT_SUPPORTED);
286                    continue;
287                }
288                GetProperties { responder } => {
289                    let _ = responder.send(&self.properties);
290                }
291                Start { responder } => {
292                    let mut lock = self.start_state.lock();
293                    match *lock {
294                        StartState::Started(time) => {
295                            let _ = responder.send(time.into_nanos());
296                            continue;
297                        }
298                        StartState::Stopped(_) => {
299                            *lock = StartState::Starting(Some(responder));
300                            return Poll::Ready(Some(Ok(CodecRequest::start(
301                                self.start_state.clone(),
302                            ))));
303                        }
304                        StartState::Starting(ref mut existing_responder) => {
305                            // Started while starting
306                            log::warn!("Got start while starting, closing codec");
307                            drop(responder);
308                            drop(existing_responder.take());
309                        }
310                        _ => {
311                            // Started while stopping
312                            log::warn!("Got start while stopping, closing codec");
313                            drop(responder);
314                        }
315                    }
316                }
317                Stop { responder } => {
318                    let mut lock = self.start_state.lock();
319                    match *lock {
320                        StartState::Stopped(time) => {
321                            let _ = responder.send(time.into_nanos());
322                            continue;
323                        }
324                        StartState::Started(_) => {
325                            *lock = StartState::Stopping(Some(responder));
326                            return Poll::Ready(Some(Ok(CodecRequest::stop(
327                                self.start_state.clone(),
328                            ))));
329                        }
330                        StartState::Stopping(ref mut existing_responder) => {
331                            // Stopped while stopping
332                            log::warn!("Got stop while stopping, closing codec");
333                            drop(responder);
334                            drop(existing_responder.take());
335                        }
336                        _ => {
337                            // Started while stopping.
338                            log::warn!("Got stop while starting, closing codec");
339                            drop(responder);
340                        }
341                    }
342                }
343                GetDaiFormats { responder } => {
344                    let _ = responder.send(Ok(self.supported_formats.as_slice()));
345                }
346                SetDaiFormat { format, responder } => {
347                    let responder = Box::new({
348                        let selected = self.selected_format.clone();
349                        let format = format.clone();
350                        move |result: std::result::Result<(), zx::Status>| {
351                            let _ = match result {
352                                // TODO(https://fxbug.dev/42128949): support specifying delay times
353                                Ok(()) => {
354                                    *selected.lock() = Some(format);
355                                    responder.send(Ok(
356                                        &fidl_fuchsia_hardware_audio::CodecFormatInfo::default(),
357                                    ))
358                                }
359                                Err(s) => responder.send(Err(s.into_raw())),
360                            };
361                        }
362                    });
363                    return Poll::Ready(Some(Ok(CodecRequest::SetFormat { format, responder })));
364                }
365                WatchPlugState { responder } => {
366                    if let Err(_e) = self.plug_state_subscriber.register(responder) {
367                        // Responder will be dropped, closing the protocol.
368                        self.terminated = TerminatedState::Terminating;
369                        return Poll::Ready(Some(Err(Error::PeerError(
370                            "WatchPlugState while hanging".to_string(),
371                        ))));
372                    }
373                }
374            }
375        }
376    }
377
378    /// Polls the SignalProcessing requests if it exists, and handles any requests it can, or returns Poll::Ready
379    /// if the request requires a response, generates an event, or an error occurs.
380    fn poll_signal(&mut self, _cx: &mut Context<'_>) -> Poll<Option<Result<CodecRequest>>> {
381        let Some(_requests) = self.signal_processing_requests.as_mut() else {
382            // The codec will wake the context when SignalProcessing is connected.
383            return Poll::Pending;
384        };
385        // TODO(https://fxbug.dev/323576893): Support Signal Processing
386        Poll::Pending
387    }
388}
389
390impl FusedStream for SoftCodec {
391    fn is_terminated(&self) -> bool {
392        self.terminated == TerminatedState::Terminated
393    }
394}
395
396impl Stream for SoftCodec {
397    type Item = Result<CodecRequest>;
398
399    fn poll_next(
400        mut self: std::pin::Pin<&mut Self>,
401        cx: &mut Context<'_>,
402    ) -> Poll<Option<Self::Item>> {
403        match self.terminated {
404            TerminatedState::Terminating => {
405                self.terminated = TerminatedState::Terminated;
406                self.codec_requests = None;
407                return Poll::Ready(None);
408            }
409            TerminatedState::Terminated => panic!("polled while terminated"),
410            TerminatedState::NotTerminated => {}
411        };
412        if let Poll::Ready(x) = self.poll_codec(cx) {
413            return Poll::Ready(x);
414        }
415        self.poll_signal(cx)
416    }
417}
418
419#[cfg(test)]
420pub(crate) mod tests {
421    use super::*;
422
423    use async_utils::PollExt;
424    use fixture::fixture;
425
426    use fidl_fuchsia_hardware_audio::{
427        CodecProxy, DaiFrameFormat, DaiFrameFormatStandard, DaiSampleFormat, DaiSupportedFormats,
428    };
429
430    const TEST_UNIQUE_ID: [u8; 16] = [5; 16];
431    const TEST_MANUF: &'static str = "Fuchsia";
432    const TEST_PRODUCT: &'static str = "Test Codec";
433
434    pub(crate) fn with_soft_codec<F>(_name: &str, test: F)
435    where
436        F: FnOnce(fasync::TestExecutor, CodecProxy, SoftCodec) -> (),
437    {
438        let exec = fasync::TestExecutor::new();
439        let (codec, client) = SoftCodec::create(
440            Some(&TEST_UNIQUE_ID),
441            TEST_MANUF,
442            TEST_PRODUCT,
443            CodecDirection::Output,
444            DaiSupportedFormats {
445                number_of_channels: vec![1, 2],
446                sample_formats: vec![DaiSampleFormat::PcmUnsigned],
447                // TODO(https://fxbug.dev/278283913): Configuration for which FrameFormatStandard the chip expects
448                frame_formats: vec![DaiFrameFormat::FrameFormatStandard(
449                    DaiFrameFormatStandard::I2S,
450                )],
451                frame_rates: vec![48000],
452                bits_per_slot: vec![16],
453                bits_per_sample: vec![16],
454            },
455            true,
456        );
457        test(exec, client.into_proxy(), codec)
458    }
459
460    #[fixture(with_soft_codec)]
461    #[fuchsia::test]
462    fn happy_lifecycle(mut exec: fasync::TestExecutor, proxy: CodecProxy, mut codec: SoftCodec) {
463        let mut codec_next_fut = codec.next();
464        let mut get_properties_fut = proxy.get_properties();
465        // Codec can answer this on it's own, this should not complete.
466        exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
467        let properties =
468            exec.run_until_stalled(&mut get_properties_fut).expect("finish").expect("ok");
469
470        assert_eq!(properties.unique_id, Some(TEST_UNIQUE_ID));
471        assert_eq!(
472            properties.plug_detect_capabilities,
473            Some(fidl_fuchsia_hardware_audio::PlugDetectCapabilities::CanAsyncNotify)
474        );
475
476        let mut formats_fut = proxy.get_dai_formats();
477        // Should not be complete
478        exec.run_until_stalled(&mut formats_fut)
479            .expect_pending("shouldn't finish until codec polled");
480
481        // Codec can answer this on it's own, this should not complete.
482        exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
483
484        // Should now
485        let formats =
486            exec.run_singlethreaded(&mut formats_fut).expect("fidl succeed").expect("format ok");
487        let Some(formats) = formats.first() else {
488            panic!("Expected at least one format");
489        };
490        assert_eq!(formats.number_of_channels.len(), 2);
491        assert_eq!(formats.frame_rates[0], 48000);
492
493        let mut set_format_fut = proxy.set_dai_format(&fidl_fuchsia_hardware_audio::DaiFormat {
494            number_of_channels: 2,
495            channels_to_use_bitmask: 0x3,
496            sample_format: DaiSampleFormat::PcmUnsigned,
497            frame_format: DaiFrameFormat::FrameFormatStandard(DaiFrameFormatStandard::I2S),
498            frame_rate: 48000,
499            bits_per_slot: 16,
500            bits_per_sample: 16,
501        });
502
503        // should not complete
504        exec.run_until_stalled(&mut set_format_fut).expect_pending("should pend");
505
506        // Should make a request
507        // Codec can answer this on it's own, this should not complete.
508        let Some(Ok(CodecRequest::SetFormat { format, responder })) =
509            exec.run_singlethreaded(&mut codec_next_fut)
510        else {
511            panic!("Expected a SetFormat request");
512        };
513
514        assert_eq!(format.number_of_channels, 2);
515        assert_eq!(format.channels_to_use_bitmask, 0x3);
516
517        exec.run_until_stalled(&mut set_format_fut).expect_pending("should pend");
518        responder(Ok(()));
519
520        let Ok(Ok(codec_format_info)) = exec.run_singlethreaded(&mut set_format_fut) else {
521            panic!("Expected ok response");
522        };
523
524        assert_eq!(codec_format_info, fidl_fuchsia_hardware_audio::CodecFormatInfo::default());
525
526        let mut codec_next_fut = codec.next();
527        exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
528
529        let mut start_fut = proxy.start();
530        exec.run_until_stalled(&mut start_fut).expect_pending("should pend");
531
532        let Some(Ok(CodecRequest::Start { responder })) =
533            exec.run_singlethreaded(&mut codec_next_fut)
534        else {
535            panic!("Expected a Start request");
536        };
537        let mut codec_next_fut = codec.next();
538
539        exec.run_until_stalled(&mut start_fut).expect_pending("should pend");
540
541        let started_time = fasync::MonotonicInstant::now();
542        responder(Ok(started_time.into()));
543
544        let time = exec.run_until_stalled(&mut start_fut).expect("should be started").unwrap();
545
546        assert_eq!(started_time.into_nanos(), time);
547
548        // Asking to start again should return the same start time.
549        let mut start_fut = proxy.start();
550        exec.run_until_stalled(&mut start_fut).expect_pending("should pend");
551        exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
552
553        let time = exec.run_until_stalled(&mut start_fut).expect("should be started").unwrap();
554        assert_eq!(started_time.into_nanos(), time);
555
556        let mut watch_plug_state_fut = proxy.watch_plug_state();
557        exec.run_until_stalled(&mut watch_plug_state_fut).expect_pending("should pend");
558        exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
559        let plug_state =
560            exec.run_until_stalled(&mut watch_plug_state_fut).expect("should finish").expect("ok");
561
562        assert_eq!(plug_state.plugged, Some(true));
563        assert!(
564            plug_state.plug_state_time.unwrap() <= fasync::MonotonicInstant::now().into_nanos()
565        );
566
567        // Second watch plug state should hang.
568        let mut watch_plug_state_fut = proxy.watch_plug_state();
569        exec.run_until_stalled(&mut watch_plug_state_fut).expect_pending("should pend");
570        exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
571        exec.run_until_stalled(&mut watch_plug_state_fut).expect_pending("should pend");
572
573        drop(codec_next_fut);
574
575        // Until we send a plug update.
576        codec.update_plug_state(false).unwrap();
577
578        let plug_state =
579            exec.run_until_stalled(&mut watch_plug_state_fut).expect("done").expect("ok");
580
581        assert_eq!(plug_state.plugged, Some(false));
582
583        let mut codec_next_fut = codec.next();
584
585        exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
586
587        // Check on the health, but there's nothing interesting about the response.
588        let mut health_fut = proxy.get_health_state();
589        exec.run_until_stalled(&mut health_fut).expect_pending("should pend");
590        exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
591        let _health = exec.run_until_stalled(&mut health_fut).expect("ready").expect("ok");
592
593        let mut stop_fut = proxy.stop();
594        let Poll::Ready(Some(Ok(CodecRequest::Stop { responder }))) =
595            exec.run_until_stalled(&mut codec_next_fut)
596        else {
597            panic!("Expected a codec request to stop");
598        };
599        exec.run_until_stalled(&mut stop_fut).expect_pending("should pend");
600
601        let response_time = fasync::MonotonicInstant::now();
602        responder(Ok(response_time.into()));
603
604        let Poll::Ready(Ok(received_time)) = exec.run_until_stalled(&mut stop_fut) else {
605            panic!("Expected stop to finish");
606        };
607
608        assert_eq!(received_time, response_time.into_nanos());
609    }
610
611    #[fuchsia::test]
612    async fn started_twice_before_response() {
613        let (mut codec, client) = SoftCodec::create(
614            Some(&TEST_UNIQUE_ID),
615            TEST_MANUF,
616            TEST_PRODUCT,
617            CodecDirection::Output,
618            DaiSupportedFormats {
619                number_of_channels: vec![1, 2],
620                sample_formats: vec![DaiSampleFormat::PcmUnsigned],
621                // TODO(https://fxbug.dev/278283913): Configuration for which FrameFormatStandard the chip expects
622                frame_formats: vec![DaiFrameFormat::FrameFormatStandard(
623                    DaiFrameFormatStandard::I2S,
624                )],
625                frame_rates: vec![48000],
626                bits_per_slot: vec![16],
627                bits_per_sample: vec![16],
628            },
629            true,
630        );
631        let proxy = client.into_proxy();
632
633        let set_format_fut = proxy.set_dai_format(&fidl_fuchsia_hardware_audio::DaiFormat {
634            number_of_channels: 2,
635            channels_to_use_bitmask: 0x3,
636            sample_format: DaiSampleFormat::PcmUnsigned,
637            frame_format: DaiFrameFormat::FrameFormatStandard(DaiFrameFormatStandard::I2S),
638            frame_rate: 48000,
639            bits_per_slot: 16,
640            bits_per_sample: 16,
641        });
642
643        // Should make a request
644        // Codec can answer this on it's own, this should not complete.
645        let Some(Ok(CodecRequest::SetFormat { format, responder })) = codec.next().await else {
646            panic!("Expected a SetFormat request");
647        };
648
649        assert_eq!(format.number_of_channels, 2);
650        assert_eq!(format.channels_to_use_bitmask, 0x3);
651
652        responder(Ok(()));
653
654        let Ok(Ok(codec_format_info)) = set_format_fut.await else {
655            panic!("Expeted ok response");
656        };
657
658        assert_eq!(codec_format_info, fidl_fuchsia_hardware_audio::CodecFormatInfo::default());
659
660        let start_fut = proxy.start();
661
662        let Some(Ok(CodecRequest::Start { responder })) = codec.next().await else {
663            panic!("Expected a Start request");
664        };
665
666        let start_before_response_fut = proxy.start();
667        // The codec stream closes.
668        let codec_next = codec.next().await;
669        let Some(Err(_)) = codec_next else {
670            panic!("Expected stream to close on bad behavior from proxy: {codec_next:?}");
671        };
672        let codec_next = codec.next().await;
673        let None = codec_next else {
674            panic!("Expected stream to terminate after: {codec_next:?}");
675        };
676        // The start that shut us down should fail
677        let start_before_response = start_before_response_fut.await;
678        let Err(_) = start_before_response else {
679            panic!("Expected error from the second start on the proxy before a response: {start_before_response:?}");
680        };
681
682        // The previous start call also will fail.
683        let start_response = start_fut.await;
684        let Err(_) = start_response else {
685            panic!("Expected error from the first start on the proxy before a response: {start_response:?}");
686        };
687        // The responder should be ok to call even though it has no effect.
688        let response_time = fasync::MonotonicInstant::now();
689        responder(Ok(response_time.into()));
690    }
691}