fuchsia_audio_device/
stream_config.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::format_err;
6use async_helpers::maybe_stream::MaybeStream;
7use fidl::endpoints::ClientEnd;
8use fidl::prelude::*;
9use fidl_fuchsia_hardware_audio::*;
10use fuchsia_inspect_derive::{IValue, Inspect};
11use fuchsia_sync::Mutex;
12
13use futures::{select, StreamExt};
14use log::{info, warn};
15use std::sync::Arc;
16use {fuchsia_async as fasync, fuchsia_inspect as inspect};
17
18use crate::audio_frame_sink::AudioFrameSink;
19use crate::audio_frame_stream::AudioFrameStream;
20use crate::frame_vmo;
21use crate::types::{AudioSampleFormat, Error, Result};
22
23#[allow(clippy::large_enum_variant)] // TODO(https://fxbug.dev/401087115)
24pub(crate) enum StreamConfigOrTask {
25    StreamConfig(SoftStreamConfig),
26    Task(fasync::Task<Result<()>>),
27    Complete,
28}
29
30impl StreamConfigOrTask {
31    /// Start the task if it's not running.
32    /// Does nothing if the task is running or completed.
33    pub(crate) fn start(&mut self) {
34        *self = match std::mem::replace(self, StreamConfigOrTask::Complete) {
35            StreamConfigOrTask::StreamConfig(st) => {
36                StreamConfigOrTask::Task(fasync::Task::spawn(st.process_requests()))
37            }
38            x => x,
39        }
40    }
41}
42
43/// Number of frames within the duration.  This includes frames that end at exactly the duration.
44pub(crate) fn frames_from_duration(
45    frames_per_second: usize,
46    duration: fasync::MonotonicDuration,
47) -> usize {
48    assert!(
49        duration >= zx::MonotonicDuration::from_nanos(0),
50        "frames_from_duration is not defined for negative durations"
51    );
52    let mut frames = duration.into_seconds() * frames_per_second as i64;
53    let frames_partial =
54        ((duration.into_nanos() % 1_000_000_000) as f64 / 1e9) * frames_per_second as f64;
55    frames += frames_partial as i64;
56    frames as usize
57}
58
59/// A software fuchsia audio output, which implements Audio Driver Streaming Interface
60/// as defined in //docs/concepts/drivers/driver_interfaces/audio_streaming.md
61#[derive(Inspect)]
62pub struct SoftStreamConfig {
63    /// The Stream channel handles format negotiation, plug detection, and gain
64    stream_config_stream: StreamConfigRequestStream,
65
66    /// The Unique ID that this stream will present to the system
67    unique_id: [u8; 16],
68    /// The manufacturer of the hardware for this stream
69    manufacturer: String,
70    /// A product description for the hardware for the stream
71    product: String,
72    /// The clock domain that this stream will present to the system
73    clock_domain: u32,
74
75    /// True when this represents an output
76    is_output: bool,
77
78    /// The supported format of this output.
79    /// Currently only support one format per output is supported.
80    supported_formats: PcmSupportedFormats,
81
82    /// The number of audio frames per packet from the frame stream.
83    /// Used to calculate audio buffer sizes.
84    /// If an input, this is the amount of space we reserve for audio frames.
85    packet_frames: usize,
86
87    /// The size of a frame.
88    /// Used to report the driver transfer size.
89    frame_bytes: usize,
90
91    /// The currently set format, in frames per second, audio sample format, and channels.
92    current_format: Option<(u32, AudioSampleFormat, u16)>,
93
94    /// The request stream for the ringbuffer.
95    ring_buffer_stream: MaybeStream<RingBufferRequestStream>,
96
97    /// A pointer to the ring buffer for this stream
98    frame_vmo: Arc<Mutex<frame_vmo::FrameVmo>>,
99
100    /// The current delay that has been communicated exists after the audio is retrieved.
101    external_delay: zx::MonotonicDuration,
102
103    /// Replied to plugged state watch.
104    plug_state_replied: bool,
105
106    /// Replied to gain state watch.
107    gain_state_replied: bool,
108
109    /// Replied to delay info watch.
110    delay_info_replied: bool,
111
112    /// Inspect node
113    #[inspect(forward)]
114    inspect: SoftStreamConfigInspect,
115}
116
117#[derive(Default, Inspect)]
118struct SoftStreamConfigInspect {
119    inspect_node: inspect::Node,
120    ring_buffer_format: IValue<Option<String>>,
121    frame_vmo_status: IValue<Option<String>>,
122}
123
124impl SoftStreamConfigInspect {
125    fn record_current_format(&mut self, current: &(u32, AudioSampleFormat, u16)) {
126        self.ring_buffer_format
127            .iset(Some(format!("{} rate: {} channels: {}", current.1, current.0, current.2)));
128    }
129
130    fn record_vmo_status(&mut self, new: &str) {
131        self.frame_vmo_status.iset(Some(new.to_owned()));
132    }
133}
134
135impl SoftStreamConfig {
136    /// Create a new software audio device, returning a client channel which can be supplied
137    /// to the AudioCore and will act correctly as an audio output driver channel which can
138    /// render audio in the `pcm_format` format, and an AudioFrameStream which produces the
139    /// audio frames delivered to the audio output.
140    /// Spawns a task to handle messages from the Audio Core and setup of internal VMO buffers
141    /// required for audio output.  See AudioFrameStream for more information on timing
142    /// requirements for audio output.
143    /// `packet_duration`: desired duration of an audio packet returned by the stream. Rounded down to
144    /// end on a audio frame boundary.
145    /// `initial_external_delay`: delay that is added after packets have been returned from the stream
146    pub fn create_output(
147        unique_id: &[u8; 16],
148        manufacturer: &str,
149        product: &str,
150        clock_domain: u32,
151        pcm_format: fidl_fuchsia_media::PcmFormat,
152        packet_duration: zx::MonotonicDuration,
153        initial_external_delay: zx::MonotonicDuration,
154    ) -> Result<(ClientEnd<StreamConfigMarker>, AudioFrameStream)> {
155        let (client, soft_stream_config) = SoftStreamConfig::build(
156            unique_id,
157            manufacturer,
158            product,
159            clock_domain,
160            true,
161            pcm_format,
162            packet_duration,
163            initial_external_delay,
164        )?;
165        Ok((client, AudioFrameStream::new(soft_stream_config)))
166    }
167
168    pub fn create_input(
169        unique_id: &[u8; 16],
170        manufacturer: &str,
171        product: &str,
172        clock_domain: u32,
173        pcm_format: fidl_fuchsia_media::PcmFormat,
174        buffer: zx::MonotonicDuration,
175    ) -> Result<(ClientEnd<StreamConfigMarker>, AudioFrameSink)> {
176        let (client, soft_stream_config) = SoftStreamConfig::build(
177            unique_id,
178            manufacturer,
179            product,
180            clock_domain,
181            false,
182            pcm_format,
183            buffer,
184            zx::MonotonicDuration::from_nanos(0),
185        )?;
186        Ok((client, AudioFrameSink::new(soft_stream_config)))
187    }
188
189    fn build(
190        unique_id: &[u8; 16],
191        manufacturer: &str,
192        product: &str,
193        clock_domain: u32,
194        is_output: bool,
195        pcm_format: fidl_fuchsia_media::PcmFormat,
196        packet_duration: zx::MonotonicDuration,
197        initial_external_delay: zx::MonotonicDuration,
198    ) -> Result<(ClientEnd<StreamConfigMarker>, SoftStreamConfig)> {
199        if pcm_format.bits_per_sample % 8 != 0 {
200            // Non-byte-aligned format not allowed.
201            return Err(Error::InvalidArgs);
202        }
203        let (client, request_stream) =
204            fidl::endpoints::create_request_stream::<StreamConfigMarker>();
205
206        let number_of_channels = pcm_format.channel_map.len();
207        let attributes = vec![ChannelAttributes::default(); number_of_channels];
208        let channel_set = ChannelSet { attributes: Some(attributes), ..Default::default() };
209        let supported_formats = PcmSupportedFormats {
210            channel_sets: Some(vec![channel_set]),
211            sample_formats: Some(vec![SampleFormat::PcmSigned]),
212            bytes_per_sample: Some(vec![(pcm_format.bits_per_sample / 8) as u8]),
213            valid_bits_per_sample: Some(vec![pcm_format.bits_per_sample as u8]),
214            frame_rates: Some(vec![pcm_format.frames_per_second]),
215            ..Default::default()
216        };
217
218        let packet_frames =
219            frames_from_duration(pcm_format.frames_per_second as usize, packet_duration);
220
221        let soft_stream_config = SoftStreamConfig {
222            stream_config_stream: request_stream,
223            unique_id: unique_id.clone(),
224            manufacturer: manufacturer.to_string(),
225            product: product.to_string(),
226            is_output,
227            clock_domain,
228            supported_formats,
229            packet_frames,
230            frame_bytes: (pcm_format.bits_per_sample / 8) as usize,
231            current_format: None,
232            ring_buffer_stream: Default::default(),
233            frame_vmo: Arc::new(Mutex::new(frame_vmo::FrameVmo::new()?)),
234            external_delay: initial_external_delay,
235            plug_state_replied: false,
236            gain_state_replied: false,
237            delay_info_replied: false,
238            inspect: Default::default(),
239        };
240        Ok((client, soft_stream_config))
241    }
242
243    pub(crate) fn frame_vmo(&self) -> Arc<Mutex<frame_vmo::FrameVmo>> {
244        self.frame_vmo.clone()
245    }
246
247    pub(crate) fn packet_frames(&self) -> usize {
248        self.packet_frames
249    }
250
251    fn frames_per_second(&self) -> u32 {
252        *self.supported_formats.frame_rates.as_ref().unwrap().get(0).unwrap()
253    }
254
255    /// Delay that is reported to the audio subsystem.
256    /// Includes the buffered packets if this is an output, and the current external delay.
257    fn current_delay(&self) -> zx::MonotonicDuration {
258        let packet_delay_nanos = if self.is_output {
259            (i64::try_from(self.packet_frames).unwrap() * 1_000_000_000)
260                / self.frames_per_second() as i64
261        } else {
262            0
263        };
264        zx::MonotonicDuration::from_nanos(packet_delay_nanos) + self.external_delay
265    }
266
267    async fn process_requests(mut self) -> Result<()> {
268        loop {
269            select! {
270                stream_config_request = self.stream_config_stream.next() => {
271                    match stream_config_request {
272                        Some(Ok(r)) => {
273                            if let Err(e) = self.handle_stream_request(r) {
274                                warn!(e:?; "stream config request")
275                            }
276                        },
277                        Some(Err(e)) => {
278                            warn!(e:?; "stream config error, stopping");
279                            return Err(e.into());
280                        },
281                        None => {
282                            warn!("stream config disconnected, stopping");
283                            return Ok(());
284                        },
285                    }
286                }
287                ring_buffer_request = self.ring_buffer_stream.next() => {
288                    match ring_buffer_request {
289                        Some(Ok(r)) => {
290                            if let Err(e) = self.handle_ring_buffer_request(r) {
291                                warn!(e:?; "ring buffer request")
292                            }
293                        },
294                        Some(Err(e)) => {
295                            warn!(e:?; "ring buffer error, dropping stream");
296                            let _ = MaybeStream::take(&mut self.ring_buffer_stream);
297                        },
298                        None => {
299                            warn!("ring buffer finished, dropping");
300                            let _ = MaybeStream::take(&mut self.ring_buffer_stream);
301                        },
302                    }
303                }
304            }
305        }
306    }
307
308    fn handle_stream_request(
309        &mut self,
310        request: StreamConfigRequest,
311    ) -> std::result::Result<(), anyhow::Error> {
312        match request {
313            StreamConfigRequest::GetHealthState { responder } => {
314                responder.send(&HealthState::default())?;
315            }
316            StreamConfigRequest::SignalProcessingConnect { protocol, control_handle: _ } => {
317                let _ = protocol.close_with_epitaph(zx::Status::NOT_SUPPORTED);
318            }
319            StreamConfigRequest::GetProperties { responder } => {
320                #[rustfmt::skip]
321                let prop = StreamProperties {
322                    unique_id:                Some(self.unique_id),
323                    is_input:                 Some(!self.is_output),
324                    can_mute:                 Some(false),
325                    can_agc:                  Some(false),
326                    min_gain_db:              Some(0f32),
327                    max_gain_db:              Some(0f32),
328                    gain_step_db:             Some(0f32),
329                    plug_detect_capabilities: Some(PlugDetectCapabilities::Hardwired),
330                    clock_domain:             Some(self.clock_domain),
331                    manufacturer:             Some(self.manufacturer.to_string()),
332                    product:                  Some(self.product.to_string()),
333                    ..Default::default()
334                };
335                responder.send(&prop)?;
336            }
337            StreamConfigRequest::GetSupportedFormats { responder } => {
338                let pcm_formats = self.supported_formats.clone();
339                let formats_vector = &[SupportedFormats {
340                    pcm_supported_formats: Some(pcm_formats),
341                    ..Default::default()
342                }];
343                responder.send(formats_vector)?;
344            }
345            StreamConfigRequest::CreateRingBuffer { format, ring_buffer, control_handle: _ } => {
346                let pcm = format.pcm_format.ok_or_else(|| format_err!("No pcm_format included"))?;
347                self.ring_buffer_stream.set(ring_buffer.into_stream());
348                let current = (pcm.frame_rate, pcm.into(), pcm.number_of_channels.into());
349                self.inspect.record_current_format(&current);
350                self.current_format = Some(current);
351                self.delay_info_replied = false;
352            }
353            StreamConfigRequest::WatchGainState { responder } => {
354                if self.gain_state_replied {
355                    // We will never change gain state.
356                    responder.drop_without_shutdown();
357                    return Ok(());
358                }
359                let gain_state = GainState {
360                    muted: Some(false),
361                    agc_enabled: Some(false),
362                    gain_db: Some(0.0f32),
363                    ..Default::default()
364                };
365                responder.send(&gain_state)?;
366                self.gain_state_replied = true
367            }
368            StreamConfigRequest::WatchPlugState { responder } => {
369                if self.plug_state_replied {
370                    // We will never change plug state.
371                    responder.drop_without_shutdown();
372                    return Ok(());
373                }
374                let time = fasync::MonotonicInstant::now();
375                let plug_state = PlugState {
376                    plugged: Some(true),
377                    plug_state_time: Some(time.into_nanos() as i64),
378                    ..Default::default()
379                };
380                responder.send(&plug_state)?;
381                self.plug_state_replied = true;
382            }
383            StreamConfigRequest::SetGain { target_state, control_handle: _ } => {
384                if let Some(true) = target_state.muted {
385                    warn!("Mute is not supported");
386                }
387                if let Some(true) = target_state.agc_enabled {
388                    warn!("AGC is not supported");
389                }
390                if let Some(gain) = target_state.gain_db {
391                    if gain != 0.0 {
392                        warn!("Non-zero gain setting not supported");
393                    }
394                }
395            }
396        }
397        Ok(())
398    }
399
400    fn handle_ring_buffer_request(
401        &mut self,
402        request: RingBufferRequest,
403    ) -> std::result::Result<(), anyhow::Error> {
404        match request {
405            RingBufferRequest::GetProperties { responder } => {
406                let prop = RingBufferProperties {
407                    needs_cache_flush_or_invalidate: Some(false),
408                    // TODO(https://fxbug.dev/42074396): Make driver_transfer_bytes (output) more accurate.
409                    driver_transfer_bytes: Some((self.packet_frames * self.frame_bytes) as u32),
410                    ..Default::default()
411                };
412                responder.send(&prop)?;
413            }
414            RingBufferRequest::GetVmo {
415                min_frames,
416                clock_recovery_notifications_per_ring,
417                responder,
418            } => {
419                let (fps, format, channels) = match &self.current_format {
420                    None => {
421                        if let Err(e) = responder.send(Err(GetVmoError::InternalError)) {
422                            warn!("Error on get vmo error send: {:?}", e);
423                        }
424                        return Ok(());
425                    }
426                    Some(x) => x.clone(),
427                };
428                // Require a minimum amount of frames for three packets.
429                let min_frames_from_duration = 3 * self.packet_frames as u32;
430                let ring_buffer_frames =
431                    (min_frames + self.packet_frames as u32).max(min_frames_from_duration);
432                self.inspect.record_vmo_status("gotten");
433                match self.frame_vmo.lock().set_format(
434                    fps,
435                    format,
436                    channels,
437                    ring_buffer_frames as usize,
438                    clock_recovery_notifications_per_ring,
439                ) {
440                    Err(e) => {
441                        warn!(e:?; "Error on vmo set format");
442                        responder.send(Err(GetVmoError::InternalError))?;
443                    }
444                    Ok(vmo_handle) => {
445                        responder.send(Ok((ring_buffer_frames, vmo_handle)))?;
446                    }
447                }
448            }
449            RingBufferRequest::Start { responder } => {
450                let time = fasync::MonotonicInstant::now();
451                self.inspect.record_vmo_status(&format!("started @ {time:?}"));
452                match self.frame_vmo.lock().start(time.into()) {
453                    Ok(()) => responder.send(time.into_nanos() as i64)?,
454                    Err(e) => {
455                        warn!(e:?; "Error on frame vmo start");
456                        responder.control_handle().shutdown_with_epitaph(zx::Status::BAD_STATE);
457                    }
458                }
459            }
460            RingBufferRequest::Stop { responder } => match self.frame_vmo.lock().stop() {
461                Ok(stopped) => {
462                    if !stopped {
463                        info!("Stopping an unstarted ring buffer");
464                    }
465                    self.inspect.record_vmo_status(&format!(
466                        "stopped @ {:?}",
467                        fasync::MonotonicInstant::now()
468                    ));
469                    responder.send()?;
470                }
471                Err(e) => {
472                    warn!(e:?; "Error on frame vmo stop");
473                    responder.control_handle().shutdown_with_epitaph(zx::Status::BAD_STATE);
474                }
475            },
476            RingBufferRequest::WatchClockRecoveryPositionInfo { responder } => {
477                self.frame_vmo.lock().set_position_responder(responder);
478            }
479            RingBufferRequest::SetActiveChannels { active_channels_bitmask: _, responder } => {
480                responder.send(Err(zx::Status::NOT_SUPPORTED.into_raw()))?;
481            }
482            RingBufferRequest::WatchDelayInfo { responder } => {
483                if self.delay_info_replied {
484                    // We will never change delay state.
485                    // TODO(https://fxbug.dev/42128949): Reply again when the external_delay changes from
486                    // outside instead of just on startup.
487                    responder.drop_without_shutdown();
488                    return Ok(());
489                }
490                // internal_delay is at least our packet duration (we buffer at least that much)
491                // plus whatever delay has been communicated from the client.
492                let delay_info = DelayInfo {
493                    internal_delay: Some(self.current_delay().into_nanos()),
494                    ..Default::default()
495                };
496                responder.send(&delay_info)?;
497                self.delay_info_replied = true;
498            }
499            RingBufferRequest::_UnknownMethod { .. } => (),
500        }
501        Ok(())
502    }
503}
504
505#[cfg(test)]
506pub(crate) mod tests {
507    use super::*;
508
509    use fidl_fuchsia_media::{AudioChannelId, AudioPcmMode, PcmFormat};
510
511    use async_utils::PollExt;
512    use fixture::fixture;
513    use futures::future;
514    use futures::task::Poll;
515
516    const TEST_UNIQUE_ID: &[u8; 16] = &[5; 16];
517    const TEST_CLOCK_DOMAIN: u32 = 0x00010203;
518
519    pub(crate) fn with_audio_frame_stream<F>(_name: &str, test: F)
520    where
521        F: FnOnce(fasync::TestExecutor, StreamConfigProxy, AudioFrameStream) -> (),
522    {
523        let exec = fasync::TestExecutor::new_with_fake_time();
524        let format = PcmFormat {
525            pcm_mode: AudioPcmMode::Linear,
526            bits_per_sample: 16,
527            frames_per_second: 44100,
528            channel_map: vec![AudioChannelId::Lf, AudioChannelId::Rf],
529        };
530        let (client, frame_stream) = SoftStreamConfig::create_output(
531            TEST_UNIQUE_ID,
532            "Google",
533            "UnitTest",
534            TEST_CLOCK_DOMAIN,
535            format,
536            zx::MonotonicDuration::from_millis(100),
537            zx::MonotonicDuration::from_millis(50),
538        )
539        .expect("should always build");
540        test(exec, client.into_proxy(), frame_stream)
541    }
542
543    #[fuchsia::test]
544    fn test_frames_from_duration() {
545        const FPS: usize = 48000;
546        // At 48kHz, each frame is 20833 and 1/3 nanoseconds. We add one nanosecond
547        // because frames need to be completely within the duration.
548        const ONE_FRAME_NANOS: i64 = 20833 + 1;
549        const THREE_FRAME_NANOS: i64 = 20833 * 3 + 1;
550
551        assert_eq!(0, frames_from_duration(FPS, zx::MonotonicDuration::from_nanos(0)));
552
553        assert_eq!(
554            0,
555            frames_from_duration(FPS, zx::MonotonicDuration::from_nanos(ONE_FRAME_NANOS - 1))
556        );
557        assert_eq!(
558            1,
559            frames_from_duration(FPS, zx::MonotonicDuration::from_nanos(ONE_FRAME_NANOS))
560        );
561
562        // Three frames is an exact number of nanoseconds, we should be able to get an exact number
563        // of frames from the duration.
564        assert_eq!(
565            2,
566            frames_from_duration(FPS, zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS - 1))
567        );
568        assert_eq!(
569            3,
570            frames_from_duration(FPS, zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS))
571        );
572        assert_eq!(
573            3,
574            frames_from_duration(FPS, zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS + 1))
575        );
576
577        assert_eq!(FPS, frames_from_duration(FPS, zx::MonotonicDuration::from_seconds(1)));
578        assert_eq!(72000, frames_from_duration(FPS, zx::MonotonicDuration::from_millis(1500)));
579
580        assert_eq!(10660, frames_from_duration(FPS, zx::MonotonicDuration::from_nanos(222084000)));
581    }
582
583    #[fuchsia::test]
584    fn soft_stream_config_audio_should_end_when_stream_dropped() {
585        let format = PcmFormat {
586            pcm_mode: AudioPcmMode::Linear,
587            bits_per_sample: 16,
588            frames_per_second: 48000,
589            channel_map: vec![AudioChannelId::Lf, AudioChannelId::Rf],
590        };
591
592        let mut exec = fasync::TestExecutor::new_with_fake_time();
593        let (client, frame_stream) = SoftStreamConfig::build(
594            TEST_UNIQUE_ID,
595            &"Google".to_string(),
596            &"UnitTest".to_string(),
597            TEST_CLOCK_DOMAIN,
598            true,
599            format,
600            zx::MonotonicDuration::from_millis(100),
601            zx::MonotonicDuration::from_millis(50),
602        )
603        .expect("should always build");
604
605        drop(frame_stream);
606
607        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future::pending::<()>()));
608
609        // The audio client should be dropped (normally this causes audio to remove the device)
610        assert_eq!(Err(zx::Status::PEER_CLOSED), client.channel().write(&[0], &mut Vec::new()));
611    }
612
613    // Returns the number of frames that were ready in the stream, draining the stream.
614    fn frames_ready(exec: &mut fasync::TestExecutor, frame_stream: &mut AudioFrameStream) -> usize {
615        let mut frames = 0;
616        while exec.run_until_stalled(&mut frame_stream.next()).is_ready() {
617            frames += 1;
618        }
619        frames
620    }
621
622    #[fixture(with_audio_frame_stream)]
623    #[fuchsia::test]
624    fn send_positions(
625        mut exec: fasync::TestExecutor,
626        stream_config: StreamConfigProxy,
627        mut frame_stream: AudioFrameStream,
628    ) {
629        // Poll the frame stream, which should start the processing of proxy requests.
630        assert_eq!(0, frames_ready(&mut exec, &mut frame_stream));
631        let _stream_config_properties = exec.run_until_stalled(&mut stream_config.get_properties());
632        let _formats = exec.run_until_stalled(&mut stream_config.get_supported_formats());
633        let (ring_buffer, server) = fidl::endpoints::create_proxy::<RingBufferMarker>();
634
635        #[rustfmt::skip]
636        let format = Format {
637            pcm_format: Some(fidl_fuchsia_hardware_audio::PcmFormat {
638                number_of_channels:      2u8,
639                sample_format:           SampleFormat::PcmSigned,
640                bytes_per_sample:        2u8,
641                valid_bits_per_sample:   16u8,
642                frame_rate:              44100,
643            }),
644            ..Default::default()
645        };
646
647        let result = stream_config.create_ring_buffer(&format, server);
648        assert!(result.is_ok());
649
650        let _ring_buffer_properties = exec.run_until_stalled(&mut ring_buffer.get_properties());
651
652        let some_active_channels_mask = 0xc3u64;
653        let result =
654            exec.run_until_stalled(&mut ring_buffer.set_active_channels(some_active_channels_mask));
655        assert!(result.is_ready());
656        let _ = match result {
657            Poll::Ready(Ok(Err(e))) => assert_eq!(e, zx::Status::NOT_SUPPORTED.into_raw()),
658            x => panic!("Expected error reply to set_active_channels, got {:?}", x),
659        };
660
661        let clock_recovery_notifications_per_ring = 10u32;
662        let _ = exec.run_until_stalled(
663            &mut ring_buffer.get_vmo(88200, clock_recovery_notifications_per_ring),
664        ); // 2 seconds.
665
666        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(42));
667        let _ = exec.wake_expired_timers();
668        let start_time = exec.run_until_stalled(&mut ring_buffer.start());
669        if let Poll::Ready(s) = start_time {
670            assert_eq!(s.expect("start time error"), 42);
671        } else {
672            panic!("start error");
673        }
674
675        // Watch number 1.
676        let mut position_info = ring_buffer.watch_clock_recovery_position_info();
677        let result = exec.run_until_stalled(&mut position_info);
678        assert!(!result.is_ready());
679
680        // Now advance in between notifications, with a 2 seconds total in the ring buffer
681        // and 10 notifications per ring we can get watch notifications every 200 msecs.
682        exec.set_fake_time(fasync::MonotonicInstant::after(zx::MonotonicDuration::from_millis(
683            201,
684        )));
685        let _ = exec.wake_expired_timers();
686        // Each frame is 100ms, there should be two of them ready now.
687        assert_eq!(2, frames_ready(&mut exec, &mut frame_stream));
688        let result = exec.run_until_stalled(&mut position_info);
689        assert!(result.is_ready());
690
691        // Watch number 2.
692        let mut position_info = ring_buffer.watch_clock_recovery_position_info();
693        let result = exec.run_until_stalled(&mut position_info);
694        assert!(!result.is_ready());
695        exec.set_fake_time(fasync::MonotonicInstant::after(zx::MonotonicDuration::from_millis(
696            201,
697        )));
698        let _ = exec.wake_expired_timers();
699        assert_eq!(2, frames_ready(&mut exec, &mut frame_stream));
700        let result = exec.run_until_stalled(&mut position_info);
701        assert!(result.is_ready());
702
703        // Watch number 3.
704        let mut position_info = ring_buffer.watch_clock_recovery_position_info();
705        let result = exec.run_until_stalled(&mut position_info);
706        assert!(!result.is_ready());
707        exec.set_fake_time(fasync::MonotonicInstant::after(zx::MonotonicDuration::from_millis(
708            201,
709        )));
710        let _ = exec.wake_expired_timers();
711        assert_eq!(2, frames_ready(&mut exec, &mut frame_stream));
712        let result = exec.run_until_stalled(&mut position_info);
713        assert!(result.is_ready());
714
715        let result = exec.run_until_stalled(&mut ring_buffer.stop());
716        assert!(result.is_ready());
717    }
718
719    #[fixture(with_audio_frame_stream)]
720    #[fuchsia::test]
721    fn watch_delay_info(
722        mut exec: fasync::TestExecutor,
723        stream_config: StreamConfigProxy,
724        mut frame_stream: AudioFrameStream,
725    ) {
726        let mut frame_fut = frame_stream.next();
727        // Poll the frame stream, which should start the processing of proxy requests.
728        exec.run_until_stalled(&mut frame_fut).expect_pending("no frames at the start");
729        let _stream_config_properties = exec.run_until_stalled(&mut stream_config.get_properties());
730        let _formats = exec.run_until_stalled(&mut stream_config.get_supported_formats());
731        let (ring_buffer, server) = fidl::endpoints::create_proxy::<RingBufferMarker>();
732
733        #[rustfmt::skip]
734        let format = Format {
735            pcm_format: Some(fidl_fuchsia_hardware_audio::PcmFormat {
736                number_of_channels:      2u8,
737                sample_format:           SampleFormat::PcmSigned,
738                bytes_per_sample:        2u8,
739                valid_bits_per_sample:   16u8,
740                frame_rate:              44100,
741            }),
742            ..Default::default()
743        };
744
745        let result = stream_config.create_ring_buffer(&format, server);
746        assert!(result.is_ok());
747
748        let result = exec.run_until_stalled(&mut ring_buffer.watch_delay_info());
749
750        // Should account for the external_delay here.
751        match result {
752            Poll::Ready(Ok(DelayInfo { internal_delay: Some(x), .. })) => {
753                assert_eq!(zx::MonotonicDuration::from_millis(150).into_nanos(), x)
754            }
755            other => panic!("Expected the correct delay info, got {other:?}"),
756        }
757    }
758}