fuchsia_audio_device/
codec.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use async_utils::hanging_get::server as hanging_server;
6use fidl::endpoints::{ClientEnd, ControlHandle, Responder};
7use fidl_fuchsia_hardware_audio::{
8    CodecMarker, CodecRequestStream, CodecStartResponder, CodecStopResponder,
9    CodecWatchPlugStateResponder, PlugState,
10};
11use fidl_fuchsia_hardware_audio_signalprocessing::SignalProcessingRequestStream;
12use fuchsia_async as fasync;
13use fuchsia_sync::Mutex;
14use futures::stream::FusedStream;
15use futures::task::{Context, Poll};
16use futures::{Stream, StreamExt};
17use std::fmt::Debug;
18use std::sync::Arc;
19
20use crate::types::{Error, Result};
21
22/// A software fuchsia audio codec.
23/// Codecs communicate their capabilities to the system and provide control points for components
24/// to start, stop, and configure media capabilities.
25/// See [[Audio Codec Interface]](https://fuchsia.dev/fuchsia-src/concepts/drivers/driver_architectures/audio_drivers/audio_codec)
26pub struct SoftCodec {
27    /// Requests from Media for Codec control (start/stop)
28    codec_requests: Option<CodecRequestStream>,
29    /// Requests from Media if the SignalProcessing Connector is connected
30    signal_processing_requests: Option<SignalProcessingRequestStream>,
31    /// CodecProperties of this device (do not change)
32    properties: fidl_fuchsia_hardware_audio::CodecProperties,
33    /// Formats that are supported by this codec.  These do not change for the lifetime of the
34    /// codec.
35    supported_formats: [fidl_fuchsia_hardware_audio::DaiSupportedFormats; 1],
36    /// Selected format, if it is selected
37    selected_format: Arc<Mutex<Option<fidl_fuchsia_hardware_audio::DaiFormat>>>,
38    /// Codec Start state, including time started/stopped.
39    /// Used to correctly respond to Start / Stop when already started/stopped
40    start_state: Arc<Mutex<StartState>>,
41    /// Plug State publisher
42    plug_state_publisher: hanging_server::Publisher<
43        PlugState,
44        CodecWatchPlugStateResponder,
45        Box<dyn Fn(&PlugState, CodecWatchPlugStateResponder) -> bool>,
46    >,
47    /// Plug State subscriber
48    plug_state_subscriber: hanging_server::Subscriber<
49        PlugState,
50        CodecWatchPlugStateResponder,
51        Box<dyn Fn(&PlugState, CodecWatchPlugStateResponder) -> bool>,
52    >,
53    /// True when the codec has terminated, and should not be polled any more.
54    terminated: TerminatedState,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Default)]
58enum TerminatedState {
59    #[default]
60    NotTerminated,
61    /// Terminating. We have returned an Err, but not a None yet.
62    Terminating,
63    /// We have returned a None, no one should poll us again.
64    Terminated,
65}
66
67#[derive(Debug)]
68pub enum StartState {
69    Stopped(zx::MonotonicInstant),
70    Starting(Option<CodecStartResponder>),
71    Started(zx::MonotonicInstant),
72    Stopping(Option<CodecStopResponder>),
73}
74
75impl Default for StartState {
76    fn default() -> Self {
77        Self::Stopped(fasync::MonotonicInstant::now().into())
78    }
79}
80
81pub enum CodecDirection {
82    Input,
83    Output,
84    Duplex,
85}
86
87/// Request from the Audio subsystem to query or perform an action on the codec.
88pub enum CodecRequest {
89    /// Set the format of this codec.
90    SetFormat {
91        /// The format requested.
92        format: fidl_fuchsia_hardware_audio::DaiFormat,
93        /// Responder to be called when the format has been set with Ok(()) or Err(Status) if
94        /// setting the format failed.
95        responder: Box<dyn FnOnce(std::result::Result<(), zx::Status>) + Send>,
96    },
97    /// Start the codec.
98    Start {
99        /// Responder to be called when the codec has been started, providing the time started or
100        /// an error if it failed to start.
101        /// The time is system monotonic when configuring the codec to start completed (it does not
102        /// include startup delay time)
103        /// Replying Err() to this will close the codec, and it will need to be re-instantiated.
104        responder: Box<dyn FnOnce(std::result::Result<zx::MonotonicInstant, zx::Status>) + Send>,
105    },
106    /// Stop the codec.
107    Stop {
108        /// Responder to be called when the codec has been configured to stop.
109        /// On success, provides the time that the codec was configured, not including delays.
110        /// On failuyre, this will close the codec and it will need to be re-instantiated.
111        responder: Box<dyn FnOnce(std::result::Result<zx::MonotonicInstant, zx::Status>) + Send>,
112    },
113}
114
115impl Debug for CodecRequest {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        match self {
118            Self::SetFormat { format, .. } => {
119                f.debug_struct("SetFormat").field("format", format).finish()
120            }
121            Self::Start { .. } => f.debug_struct("Start").finish(),
122            Self::Stop { .. } => f.debug_struct("Stop").finish(),
123        }
124    }
125}
126
127impl CodecRequest {
128    fn start(state: Arc<Mutex<StartState>>) -> Self {
129        Self::Start {
130            responder: Box::new(move |time| {
131                let mut lock = state.lock();
132                let StartState::Starting(ref mut responder) = *lock else {
133                    log::warn!("Not in starting state, noop response");
134                    return;
135                };
136                let Some(responder) = responder.take() else {
137                    log::warn!("Responder missing for start, noop");
138                    return;
139                };
140                let Ok(time) = time else {
141                    // Some error happened here, we close the channel by dropping the responder.
142                    drop(responder);
143                    return;
144                };
145                *lock = StartState::Started(time);
146                let _ = responder.send(time.into_nanos());
147            }),
148        }
149    }
150
151    fn stop(state: Arc<Mutex<StartState>>) -> Self {
152        Self::Stop {
153            responder: Box::new(move |time| {
154                let mut lock = state.lock();
155                let StartState::Stopping(ref mut responder) = *lock else {
156                    log::warn!("Not in stopping state, noop response");
157                    return;
158                };
159                let Some(responder) = responder.take() else {
160                    log::warn!("Responder missing for stop, noop");
161                    return;
162                };
163                let Ok(time) = time else {
164                    // Some error happened here, we close the channel by dropping the responder.
165                    drop(responder);
166                    return;
167                };
168                *lock = StartState::Stopped(time);
169                let _ = responder.send(time.into_nanos());
170            }),
171        }
172    }
173}
174
175impl SoftCodec {
176    pub fn create(
177        unique_id: Option<&[u8; 16]>,
178        manufacturer: &str,
179        product: &str,
180        direction: CodecDirection,
181        formats: fidl_fuchsia_hardware_audio::DaiSupportedFormats,
182        initially_plugged: bool,
183    ) -> (Self, ClientEnd<CodecMarker>) {
184        let (client, codec_requests) = fidl::endpoints::create_request_stream::<CodecMarker>();
185        let is_input = match direction {
186            CodecDirection::Input => Some(true),
187            CodecDirection::Output => Some(false),
188            CodecDirection::Duplex => None,
189        };
190        let properties = fidl_fuchsia_hardware_audio::CodecProperties {
191            is_input,
192            manufacturer: Some(manufacturer.to_string()),
193            product: Some(product.to_string()),
194            unique_id: unique_id.cloned(),
195            plug_detect_capabilities: Some(
196                fidl_fuchsia_hardware_audio::PlugDetectCapabilities::CanAsyncNotify,
197            ),
198            ..Default::default()
199        };
200        let plug_state = fidl_fuchsia_hardware_audio::PlugState {
201            plugged: Some(initially_plugged),
202            plug_state_time: Some(fuchsia_async::MonotonicInstant::now().into_nanos()),
203            ..Default::default()
204        };
205        let mut plug_state_server = hanging_server::HangingGet::<
206            _,
207            _,
208            Box<dyn Fn(&PlugState, CodecWatchPlugStateResponder) -> bool>,
209        >::new(
210            plug_state,
211            Box::new(|state, responder: CodecWatchPlugStateResponder| {
212                let _ = responder.send(state);
213                true
214            }),
215        );
216        let plug_state_publisher = plug_state_server.new_publisher();
217        let plug_state_subscriber = plug_state_server.new_subscriber();
218        (
219            Self {
220                codec_requests: Some(codec_requests),
221                signal_processing_requests: Default::default(),
222                properties,
223                supported_formats: [formats],
224                selected_format: Default::default(),
225                start_state: Default::default(),
226                plug_state_publisher,
227                plug_state_subscriber,
228                terminated: Default::default(),
229            },
230            client,
231        )
232    }
233
234    pub fn update_plug_state(&self, plugged: bool) -> Result<()> {
235        self.plug_state_publisher.set(fidl_fuchsia_hardware_audio::PlugState {
236            plugged: Some(plugged),
237            plug_state_time: Some(fasync::MonotonicInstant::now().into_nanos()),
238            ..Default::default()
239        });
240        Ok(())
241    }
242
243    /// Polls the Codec requests, and handles any requests it can, or returns Poll::Ready if the
244    /// request requires a response, generates an event, or an error occurs.
245    fn poll_codec(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<CodecRequest>>> {
246        let Some(codec_requests) = self.codec_requests.as_mut() else {
247            panic!("Codec requests polled without a request stream");
248        };
249        loop {
250            let request = futures::ready!(codec_requests.poll_next_unpin(cx));
251            let request = match request {
252                None => {
253                    self.terminated = TerminatedState::Terminating;
254                    return Poll::Ready(Some(Err(Error::RequestStreamError(
255                        fidl::Error::ClientRead(zx::Status::PEER_CLOSED.into()),
256                    ))));
257                }
258                Some(Err(e)) => {
259                    self.terminated = TerminatedState::Terminating;
260                    return Poll::Ready(Some(Err(Error::RequestStreamError(e))));
261                }
262                Some(Ok(request)) => request,
263            };
264            use fidl_fuchsia_hardware_audio::CodecRequest::*;
265            log::info!("Handling codec request: {request:?}");
266            match request {
267                GetHealthState { responder } => {
268                    let _ = responder.send(&fidl_fuchsia_hardware_audio::HealthState::default());
269                }
270                SignalProcessingConnect { protocol, .. } => {
271                    if self.signal_processing_requests.is_some() {
272                        let _ = protocol.close_with_epitaph(zx::Status::ALREADY_BOUND);
273                        continue;
274                    }
275                    // TODO(https://fxbug.dev/323576893): Support the Signal Processing connect.
276                    let _ = protocol.close_with_epitaph(zx::Status::NOT_SUPPORTED);
277                }
278                Reset { responder } => {
279                    // TODO(https://fxbug.dev/324247020): Support resetting.
280                    responder.control_handle().shutdown_with_epitaph(zx::Status::NOT_SUPPORTED);
281                    continue;
282                }
283                GetProperties { responder } => {
284                    let _ = responder.send(&self.properties);
285                }
286                Start { responder } => {
287                    let mut lock = self.start_state.lock();
288                    match *lock {
289                        StartState::Started(time) => {
290                            let _ = responder.send(time.into_nanos());
291                            continue;
292                        }
293                        StartState::Stopped(_) => {
294                            *lock = StartState::Starting(Some(responder));
295                            return Poll::Ready(Some(Ok(CodecRequest::start(
296                                self.start_state.clone(),
297                            ))));
298                        }
299                        StartState::Starting(ref mut existing_responder) => {
300                            // Started while starting
301                            log::warn!("Got start while starting, closing codec");
302                            drop(responder);
303                            drop(existing_responder.take());
304                        }
305                        _ => {
306                            // Started while stopping
307                            log::warn!("Got start while stopping, closing codec");
308                            drop(responder);
309                        }
310                    }
311                }
312                Stop { responder } => {
313                    let mut lock = self.start_state.lock();
314                    match *lock {
315                        StartState::Stopped(time) => {
316                            let _ = responder.send(time.into_nanos());
317                            continue;
318                        }
319                        StartState::Started(_) => {
320                            *lock = StartState::Stopping(Some(responder));
321                            return Poll::Ready(Some(Ok(CodecRequest::stop(
322                                self.start_state.clone(),
323                            ))));
324                        }
325                        StartState::Stopping(ref mut existing_responder) => {
326                            // Stopped while stopping
327                            log::warn!("Got stop while stopping, closing codec");
328                            drop(responder);
329                            drop(existing_responder.take());
330                        }
331                        _ => {
332                            // Started while stopping.
333                            log::warn!("Got stop while starting, closing codec");
334                            drop(responder);
335                        }
336                    }
337                }
338                GetDaiFormats { responder } => {
339                    let _ = responder.send(Ok(self.supported_formats.as_slice()));
340                }
341                SetDaiFormat { format, responder } => {
342                    let responder = Box::new({
343                        let selected = self.selected_format.clone();
344                        let format = format.clone();
345                        move |result: std::result::Result<(), zx::Status>| {
346                            let _ = match result {
347                                // TODO(https://fxbug.dev/42128949): support specifying delay times
348                                Ok(()) => {
349                                    *selected.lock() = Some(format);
350                                    responder.send(Ok(
351                                        &fidl_fuchsia_hardware_audio::CodecFormatInfo::default(),
352                                    ))
353                                }
354                                Err(s) => responder.send(Err(s.into_raw())),
355                            };
356                        }
357                    });
358                    return Poll::Ready(Some(Ok(CodecRequest::SetFormat { format, responder })));
359                }
360                WatchPlugState { responder } => {
361                    if let Err(_e) = self.plug_state_subscriber.register(responder) {
362                        // Responder will be dropped, closing the protocol.
363                        self.terminated = TerminatedState::Terminating;
364                        return Poll::Ready(Some(Err(Error::PeerError(
365                            "WatchPlugState while hanging".to_string(),
366                        ))));
367                    }
368                }
369            }
370        }
371    }
372
373    /// Polls the SignalProcessing requests if it exists, and handles any requests it can, or returns Poll::Ready
374    /// if the request requires a response, generates an event, or an error occurs.
375    fn poll_signal(&mut self, _cx: &mut Context<'_>) -> Poll<Option<Result<CodecRequest>>> {
376        let Some(_requests) = self.signal_processing_requests.as_mut() else {
377            // The codec will wake the context when SignalProcessing is connected.
378            return Poll::Pending;
379        };
380        // TODO(https://fxbug.dev/323576893): Support Signal Processing
381        Poll::Pending
382    }
383}
384
385impl FusedStream for SoftCodec {
386    fn is_terminated(&self) -> bool {
387        self.terminated == TerminatedState::Terminated
388    }
389}
390
391impl Stream for SoftCodec {
392    type Item = Result<CodecRequest>;
393
394    fn poll_next(
395        mut self: std::pin::Pin<&mut Self>,
396        cx: &mut Context<'_>,
397    ) -> Poll<Option<Self::Item>> {
398        match self.terminated {
399            TerminatedState::Terminating => {
400                self.terminated = TerminatedState::Terminated;
401                self.codec_requests = None;
402                return Poll::Ready(None);
403            }
404            TerminatedState::Terminated => panic!("polled while terminated"),
405            TerminatedState::NotTerminated => {}
406        };
407        if let Poll::Ready(x) = self.poll_codec(cx) {
408            return Poll::Ready(x);
409        }
410        self.poll_signal(cx)
411    }
412}
413
414#[cfg(test)]
415pub(crate) mod tests {
416    use super::*;
417
418    use async_utils::PollExt;
419    use fixture::fixture;
420
421    use fidl_fuchsia_hardware_audio::{
422        CodecProxy, DaiFrameFormat, DaiFrameFormatStandard, DaiSampleFormat, DaiSupportedFormats,
423    };
424
425    const TEST_UNIQUE_ID: [u8; 16] = [5; 16];
426    const TEST_MANUF: &'static str = "Fuchsia";
427    const TEST_PRODUCT: &'static str = "Test Codec";
428
429    pub(crate) fn with_soft_codec<F>(_name: &str, test: F)
430    where
431        F: FnOnce(fasync::TestExecutor, CodecProxy, SoftCodec) -> (),
432    {
433        let exec = fasync::TestExecutor::new();
434        let (codec, client) = SoftCodec::create(
435            Some(&TEST_UNIQUE_ID),
436            TEST_MANUF,
437            TEST_PRODUCT,
438            CodecDirection::Output,
439            DaiSupportedFormats {
440                number_of_channels: vec![1, 2],
441                sample_formats: vec![DaiSampleFormat::PcmUnsigned],
442                // TODO(https://fxbug.dev/278283913): Configuration for which FrameFormatStandard the chip expects
443                frame_formats: vec![DaiFrameFormat::FrameFormatStandard(
444                    DaiFrameFormatStandard::I2S,
445                )],
446                frame_rates: vec![48000],
447                bits_per_slot: vec![16],
448                bits_per_sample: vec![16],
449            },
450            true,
451        );
452        test(exec, client.into_proxy(), codec)
453    }
454
455    #[fixture(with_soft_codec)]
456    #[fuchsia::test]
457    fn happy_lifecycle(mut exec: fasync::TestExecutor, proxy: CodecProxy, mut codec: SoftCodec) {
458        let mut codec_next_fut = codec.next();
459        let mut get_properties_fut = proxy.get_properties();
460        // Codec can answer this on it's own, this should not complete.
461        exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
462        let properties =
463            exec.run_until_stalled(&mut get_properties_fut).expect("finish").expect("ok");
464
465        assert_eq!(properties.unique_id, Some(TEST_UNIQUE_ID));
466        assert_eq!(
467            properties.plug_detect_capabilities,
468            Some(fidl_fuchsia_hardware_audio::PlugDetectCapabilities::CanAsyncNotify)
469        );
470
471        let mut formats_fut = proxy.get_dai_formats();
472        // Should not be complete
473        exec.run_until_stalled(&mut formats_fut)
474            .expect_pending("shouldn't finish until codec polled");
475
476        // Codec can answer this on it's own, this should not complete.
477        exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
478
479        // Should now
480        let formats =
481            exec.run_singlethreaded(&mut formats_fut).expect("fidl succeed").expect("format ok");
482        let Some(formats) = formats.first() else {
483            panic!("Expected at least one format");
484        };
485        assert_eq!(formats.number_of_channels.len(), 2);
486        assert_eq!(formats.frame_rates[0], 48000);
487
488        let mut set_format_fut = proxy.set_dai_format(&fidl_fuchsia_hardware_audio::DaiFormat {
489            number_of_channels: 2,
490            channels_to_use_bitmask: 0x3,
491            sample_format: DaiSampleFormat::PcmUnsigned,
492            frame_format: DaiFrameFormat::FrameFormatStandard(DaiFrameFormatStandard::I2S),
493            frame_rate: 48000,
494            bits_per_slot: 16,
495            bits_per_sample: 16,
496        });
497
498        // should not complete
499        exec.run_until_stalled(&mut set_format_fut).expect_pending("should pend");
500
501        // Should make a request
502        // Codec can answer this on it's own, this should not complete.
503        let Some(Ok(CodecRequest::SetFormat { format, responder })) =
504            exec.run_singlethreaded(&mut codec_next_fut)
505        else {
506            panic!("Expected a SetFormat request");
507        };
508
509        assert_eq!(format.number_of_channels, 2);
510        assert_eq!(format.channels_to_use_bitmask, 0x3);
511
512        exec.run_until_stalled(&mut set_format_fut).expect_pending("should pend");
513        responder(Ok(()));
514
515        let Ok(Ok(codec_format_info)) = exec.run_singlethreaded(&mut set_format_fut) else {
516            panic!("Expected ok response");
517        };
518
519        assert_eq!(codec_format_info, fidl_fuchsia_hardware_audio::CodecFormatInfo::default());
520
521        let mut codec_next_fut = codec.next();
522        exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
523
524        let mut start_fut = proxy.start();
525        exec.run_until_stalled(&mut start_fut).expect_pending("should pend");
526
527        let Some(Ok(CodecRequest::Start { responder })) =
528            exec.run_singlethreaded(&mut codec_next_fut)
529        else {
530            panic!("Expected a Start request");
531        };
532        let mut codec_next_fut = codec.next();
533
534        exec.run_until_stalled(&mut start_fut).expect_pending("should pend");
535
536        let started_time = fasync::MonotonicInstant::now();
537        responder(Ok(started_time.into()));
538
539        let time = exec.run_until_stalled(&mut start_fut).expect("should be started").unwrap();
540
541        assert_eq!(started_time.into_nanos(), time);
542
543        // Asking to start again should return the same start time.
544        let mut start_fut = proxy.start();
545        exec.run_until_stalled(&mut start_fut).expect_pending("should pend");
546        exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
547
548        let time = exec.run_until_stalled(&mut start_fut).expect("should be started").unwrap();
549        assert_eq!(started_time.into_nanos(), time);
550
551        let mut watch_plug_state_fut = proxy.watch_plug_state();
552        exec.run_until_stalled(&mut watch_plug_state_fut).expect_pending("should pend");
553        exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
554        let plug_state =
555            exec.run_until_stalled(&mut watch_plug_state_fut).expect("should finish").expect("ok");
556
557        assert_eq!(plug_state.plugged, Some(true));
558        assert!(
559            plug_state.plug_state_time.unwrap() <= fasync::MonotonicInstant::now().into_nanos()
560        );
561
562        // Second watch plug state should hang.
563        let mut watch_plug_state_fut = proxy.watch_plug_state();
564        exec.run_until_stalled(&mut watch_plug_state_fut).expect_pending("should pend");
565        exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
566        exec.run_until_stalled(&mut watch_plug_state_fut).expect_pending("should pend");
567
568        drop(codec_next_fut);
569
570        // Until we send a plug update.
571        codec.update_plug_state(false).unwrap();
572
573        let plug_state =
574            exec.run_until_stalled(&mut watch_plug_state_fut).expect("done").expect("ok");
575
576        assert_eq!(plug_state.plugged, Some(false));
577
578        let mut codec_next_fut = codec.next();
579
580        exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
581
582        // Check on the health, but there's nothing interesting about the response.
583        let mut health_fut = proxy.get_health_state();
584        exec.run_until_stalled(&mut health_fut).expect_pending("should pend");
585        exec.run_until_stalled(&mut codec_next_fut).expect_pending("no event");
586        let _health = exec.run_until_stalled(&mut health_fut).expect("ready").expect("ok");
587
588        let mut stop_fut = proxy.stop();
589        let Poll::Ready(Some(Ok(CodecRequest::Stop { responder }))) =
590            exec.run_until_stalled(&mut codec_next_fut)
591        else {
592            panic!("Expected a codec request to stop");
593        };
594        exec.run_until_stalled(&mut stop_fut).expect_pending("should pend");
595
596        let response_time = fasync::MonotonicInstant::now();
597        responder(Ok(response_time.into()));
598
599        let Poll::Ready(Ok(received_time)) = exec.run_until_stalled(&mut stop_fut) else {
600            panic!("Expected stop to finish");
601        };
602
603        assert_eq!(received_time, response_time.into_nanos());
604    }
605
606    #[fuchsia::test]
607    async fn started_twice_before_response() {
608        let (mut codec, client) = SoftCodec::create(
609            Some(&TEST_UNIQUE_ID),
610            TEST_MANUF,
611            TEST_PRODUCT,
612            CodecDirection::Output,
613            DaiSupportedFormats {
614                number_of_channels: vec![1, 2],
615                sample_formats: vec![DaiSampleFormat::PcmUnsigned],
616                // TODO(https://fxbug.dev/278283913): Configuration for which FrameFormatStandard the chip expects
617                frame_formats: vec![DaiFrameFormat::FrameFormatStandard(
618                    DaiFrameFormatStandard::I2S,
619                )],
620                frame_rates: vec![48000],
621                bits_per_slot: vec![16],
622                bits_per_sample: vec![16],
623            },
624            true,
625        );
626        let proxy = client.into_proxy();
627
628        let set_format_fut = proxy.set_dai_format(&fidl_fuchsia_hardware_audio::DaiFormat {
629            number_of_channels: 2,
630            channels_to_use_bitmask: 0x3,
631            sample_format: DaiSampleFormat::PcmUnsigned,
632            frame_format: DaiFrameFormat::FrameFormatStandard(DaiFrameFormatStandard::I2S),
633            frame_rate: 48000,
634            bits_per_slot: 16,
635            bits_per_sample: 16,
636        });
637
638        // Should make a request
639        // Codec can answer this on it's own, this should not complete.
640        let Some(Ok(CodecRequest::SetFormat { format, responder })) = codec.next().await else {
641            panic!("Expected a SetFormat request");
642        };
643
644        assert_eq!(format.number_of_channels, 2);
645        assert_eq!(format.channels_to_use_bitmask, 0x3);
646
647        responder(Ok(()));
648
649        let Ok(Ok(codec_format_info)) = set_format_fut.await else {
650            panic!("Expeted ok response");
651        };
652
653        assert_eq!(codec_format_info, fidl_fuchsia_hardware_audio::CodecFormatInfo::default());
654
655        let start_fut = proxy.start();
656
657        let Some(Ok(CodecRequest::Start { responder })) = codec.next().await else {
658            panic!("Expected a Start request");
659        };
660
661        let start_before_response_fut = proxy.start();
662        // The codec stream closes.
663        let codec_next = codec.next().await;
664        let Some(Err(_)) = codec_next else {
665            panic!("Expected stream to close on bad behavior from proxy: {codec_next:?}");
666        };
667        let codec_next = codec.next().await;
668        let None = codec_next else {
669            panic!("Expected stream to terminate after: {codec_next:?}");
670        };
671        // The start that shut us down should fail
672        let start_before_response = start_before_response_fut.await;
673        let Err(_) = start_before_response else {
674            panic!("Expected error from the second start on the proxy before a response: {start_before_response:?}");
675        };
676
677        // The previous start call also will fail.
678        let start_response = start_fut.await;
679        let Err(_) = start_response else {
680            panic!("Expected error from the first start on the proxy before a response: {start_response:?}");
681        };
682        // The responder should be ok to call even though it has no effect.
683        let response_time = fasync::MonotonicInstant::now();
684        responder(Ok(response_time.into()));
685    }
686}