Skip to main content

fuchsia_audio_device/
stream_config.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::format_err;
6use async_helpers::maybe_stream::MaybeStream;
7use fidl::endpoints::ClientEnd;
8use fidl::prelude::*;
9use fidl_fuchsia_hardware_audio::*;
10use fuchsia_inspect_derive::{IValue, Inspect};
11use fuchsia_sync::Mutex;
12
13use futures::{StreamExt, select};
14use log::{info, warn};
15use std::sync::Arc;
16use {fuchsia_async as fasync, fuchsia_inspect as inspect};
17
18use crate::audio_frame_sink::AudioFrameSink;
19use crate::audio_frame_stream::AudioFrameStream;
20use crate::frame_vmo;
21use crate::types::{AudioSampleFormat, Error, Result};
22
23pub(crate) enum StreamConfigOrTask {
24    StreamConfig(Box<SoftStreamConfig>),
25    Task(fasync::Task<Result<()>>),
26    Complete,
27}
28
29impl StreamConfigOrTask {
30    /// Start the task if it's not running.
31    /// Does nothing if the task is running or completed.
32    pub(crate) fn start(&mut self) {
33        *self = match std::mem::replace(self, StreamConfigOrTask::Complete) {
34            StreamConfigOrTask::StreamConfig(st) => {
35                StreamConfigOrTask::Task(fasync::Task::spawn(st.process_requests()))
36            }
37            x => x,
38        }
39    }
40}
41
42/// Number of frames within the duration.  This includes frames that end at exactly the duration.
43pub(crate) fn frames_from_duration(
44    frames_per_second: usize,
45    duration: fasync::MonotonicDuration,
46) -> usize {
47    assert!(
48        duration >= zx::MonotonicDuration::from_nanos(0),
49        "frames_from_duration is not defined for negative durations"
50    );
51    let mut frames = duration.into_seconds() * frames_per_second as i64;
52    let frames_partial =
53        ((duration.into_nanos() % 1_000_000_000) as f64 / 1e9) * frames_per_second as f64;
54    frames += frames_partial as i64;
55    frames as usize
56}
57
58/// A software fuchsia audio output, which implements Audio Driver Streaming Interface
59/// as defined in //docs/concepts/drivers/driver_interfaces/audio_streaming.md
60#[derive(Inspect)]
61pub struct SoftStreamConfig {
62    /// The Stream channel handles format negotiation, plug detection, and gain
63    stream_config_stream: StreamConfigRequestStream,
64
65    /// The Unique ID that this stream will present to the system
66    unique_id: [u8; 16],
67    /// The manufacturer of the hardware for this stream
68    manufacturer: String,
69    /// A product description for the hardware for the stream
70    product: String,
71    /// The clock domain that this stream will present to the system
72    clock_domain: u32,
73
74    /// True when this represents an output
75    is_output: bool,
76
77    /// The supported format of this output.
78    /// Currently only support one format per output is supported.
79    supported_formats: PcmSupportedFormats,
80
81    /// The number of audio frames per packet from the frame stream.
82    /// Used to calculate audio buffer sizes.
83    /// If an input, this is the amount of space we reserve for audio frames.
84    packet_frames: usize,
85
86    /// The size of a frame.
87    /// Used to report the driver transfer size.
88    frame_bytes: usize,
89
90    /// The currently set format, in frames per second, audio sample format, and channels.
91    current_format: Option<(u32, AudioSampleFormat, u16)>,
92
93    /// The request stream for the ringbuffer.
94    ring_buffer_stream: MaybeStream<RingBufferRequestStream>,
95
96    /// A pointer to the ring buffer for this stream
97    frame_vmo: Arc<Mutex<frame_vmo::FrameVmo>>,
98
99    /// The current delay that has been communicated exists after the audio is retrieved.
100    external_delay: zx::MonotonicDuration,
101
102    /// Replied to plugged state watch.
103    plug_state_replied: bool,
104
105    /// Replied to gain state watch.
106    gain_state_replied: bool,
107
108    /// Replied to delay info watch.
109    delay_info_replied: bool,
110
111    /// Inspect node
112    #[inspect(forward)]
113    inspect: SoftStreamConfigInspect,
114}
115
116#[derive(Default, Inspect)]
117struct SoftStreamConfigInspect {
118    inspect_node: inspect::Node,
119    ring_buffer_format: IValue<Option<String>>,
120    frame_vmo_status: IValue<Option<String>>,
121}
122
123impl SoftStreamConfigInspect {
124    fn record_current_format(&mut self, current: &(u32, AudioSampleFormat, u16)) {
125        self.ring_buffer_format
126            .iset(Some(format!("{} rate: {} channels: {}", current.1, current.0, current.2)));
127    }
128
129    fn record_vmo_status(&mut self, new: &str) {
130        self.frame_vmo_status.iset(Some(new.to_owned()));
131    }
132}
133
134impl SoftStreamConfig {
135    /// Create a new software audio device, returning a client channel which can be supplied
136    /// to the AudioCore and will act correctly as an audio output driver channel which can
137    /// render audio in the `pcm_format` format, and an AudioFrameStream which produces the
138    /// audio frames delivered to the audio output.
139    /// Spawns a task to handle messages from the Audio Core and setup of internal VMO buffers
140    /// required for audio output.  See AudioFrameStream for more information on timing
141    /// requirements for audio output.
142    /// `packet_duration`: desired duration of an audio packet returned by the stream. Rounded down to
143    /// end on a audio frame boundary.
144    /// `initial_external_delay`: delay that is added after packets have been returned from the stream
145    pub fn create_output(
146        unique_id: &[u8; 16],
147        manufacturer: &str,
148        product: &str,
149        clock_domain: u32,
150        pcm_format: fidl_fuchsia_media::PcmFormat,
151        packet_duration: zx::MonotonicDuration,
152        initial_external_delay: zx::MonotonicDuration,
153    ) -> Result<(ClientEnd<StreamConfigMarker>, AudioFrameStream)> {
154        let (client, soft_stream_config) = SoftStreamConfig::build(
155            unique_id,
156            manufacturer,
157            product,
158            clock_domain,
159            true,
160            pcm_format,
161            packet_duration,
162            initial_external_delay,
163        )?;
164        Ok((client, AudioFrameStream::new(soft_stream_config)))
165    }
166
167    pub fn create_input(
168        unique_id: &[u8; 16],
169        manufacturer: &str,
170        product: &str,
171        clock_domain: u32,
172        pcm_format: fidl_fuchsia_media::PcmFormat,
173        buffer: zx::MonotonicDuration,
174    ) -> Result<(ClientEnd<StreamConfigMarker>, AudioFrameSink)> {
175        let (client, soft_stream_config) = SoftStreamConfig::build(
176            unique_id,
177            manufacturer,
178            product,
179            clock_domain,
180            false,
181            pcm_format,
182            buffer,
183            zx::MonotonicDuration::from_nanos(0),
184        )?;
185        Ok((client, AudioFrameSink::new(soft_stream_config)))
186    }
187
188    fn build(
189        unique_id: &[u8; 16],
190        manufacturer: &str,
191        product: &str,
192        clock_domain: u32,
193        is_output: bool,
194        pcm_format: fidl_fuchsia_media::PcmFormat,
195        packet_duration: zx::MonotonicDuration,
196        initial_external_delay: zx::MonotonicDuration,
197    ) -> Result<(ClientEnd<StreamConfigMarker>, SoftStreamConfig)> {
198        if pcm_format.bits_per_sample % 8 != 0 {
199            // Non-byte-aligned format not allowed.
200            return Err(Error::InvalidArgs);
201        }
202        let (client, request_stream) =
203            fidl::endpoints::create_request_stream::<StreamConfigMarker>();
204
205        let number_of_channels = pcm_format.channel_map.len();
206        let attributes = vec![ChannelAttributes::default(); number_of_channels];
207        let channel_set = ChannelSet { attributes: Some(attributes), ..Default::default() };
208        let supported_formats = PcmSupportedFormats {
209            channel_sets: Some(vec![channel_set]),
210            sample_formats: Some(vec![SampleFormat::PcmSigned]),
211            bytes_per_sample: Some(vec![(pcm_format.bits_per_sample / 8) as u8]),
212            valid_bits_per_sample: Some(vec![pcm_format.bits_per_sample as u8]),
213            frame_rates: Some(vec![pcm_format.frames_per_second]),
214            ..Default::default()
215        };
216
217        let packet_frames =
218            frames_from_duration(pcm_format.frames_per_second as usize, packet_duration);
219
220        let soft_stream_config = SoftStreamConfig {
221            stream_config_stream: request_stream,
222            unique_id: unique_id.clone(),
223            manufacturer: manufacturer.to_string(),
224            product: product.to_string(),
225            is_output,
226            clock_domain,
227            supported_formats,
228            packet_frames,
229            frame_bytes: (pcm_format.bits_per_sample / 8) as usize,
230            current_format: None,
231            ring_buffer_stream: Default::default(),
232            frame_vmo: Arc::new(Mutex::new(frame_vmo::FrameVmo::new()?)),
233            external_delay: initial_external_delay,
234            plug_state_replied: false,
235            gain_state_replied: false,
236            delay_info_replied: false,
237            inspect: Default::default(),
238        };
239        Ok((client, soft_stream_config))
240    }
241
242    pub(crate) fn frame_vmo(&self) -> Arc<Mutex<frame_vmo::FrameVmo>> {
243        self.frame_vmo.clone()
244    }
245
246    pub(crate) fn packet_frames(&self) -> usize {
247        self.packet_frames
248    }
249
250    fn frames_per_second(&self) -> u32 {
251        *self.supported_formats.frame_rates.as_ref().unwrap().get(0).unwrap()
252    }
253
254    /// Delay that is reported to the audio subsystem.
255    /// Includes the buffered packets if this is an output, and the current external delay.
256    fn current_delay(&self) -> zx::MonotonicDuration {
257        let packet_delay_nanos = if self.is_output {
258            (i64::try_from(self.packet_frames).unwrap() * 1_000_000_000)
259                / self.frames_per_second() as i64
260        } else {
261            0
262        };
263        zx::MonotonicDuration::from_nanos(packet_delay_nanos) + self.external_delay
264    }
265
266    async fn process_requests(mut self) -> Result<()> {
267        loop {
268            select! {
269                stream_config_request = self.stream_config_stream.next() => {
270                    match stream_config_request {
271                        Some(Ok(r)) => {
272                            if let Err(e) = self.handle_stream_request(r) {
273                                warn!(e:?; "stream config request")
274                            }
275                        },
276                        Some(Err(e)) => {
277                            warn!(e:?; "stream config error, stopping");
278                            return Err(e.into());
279                        },
280                        None => {
281                            warn!("stream config disconnected, stopping");
282                            return Ok(());
283                        },
284                    }
285                }
286                ring_buffer_request = self.ring_buffer_stream.next() => {
287                    match ring_buffer_request {
288                        Some(Ok(r)) => {
289                            if let Err(e) = self.handle_ring_buffer_request(r) {
290                                warn!(e:?; "ring buffer request")
291                            }
292                        },
293                        Some(Err(e)) => {
294                            warn!(e:?; "ring buffer error, dropping stream");
295                            let _ = MaybeStream::take(&mut self.ring_buffer_stream);
296                        },
297                        None => {
298                            warn!("ring buffer finished, dropping");
299                            let _ = MaybeStream::take(&mut self.ring_buffer_stream);
300                        },
301                    }
302                }
303            }
304        }
305    }
306
307    fn handle_stream_request(
308        &mut self,
309        request: StreamConfigRequest,
310    ) -> std::result::Result<(), anyhow::Error> {
311        match request {
312            StreamConfigRequest::GetHealthState { responder } => {
313                responder.send(&HealthState::default())?;
314            }
315            StreamConfigRequest::SignalProcessingConnect { protocol, control_handle: _ } => {
316                let _ = protocol.close_with_epitaph(zx::Status::NOT_SUPPORTED);
317            }
318            StreamConfigRequest::GetProperties { responder } => {
319                #[rustfmt::skip]
320                let prop = StreamProperties {
321                    unique_id:                Some(self.unique_id),
322                    is_input:                 Some(!self.is_output),
323                    can_mute:                 Some(false),
324                    can_agc:                  Some(false),
325                    min_gain_db:              Some(0f32),
326                    max_gain_db:              Some(0f32),
327                    gain_step_db:             Some(0f32),
328                    plug_detect_capabilities: Some(PlugDetectCapabilities::Hardwired),
329                    clock_domain:             Some(self.clock_domain),
330                    manufacturer:             Some(self.manufacturer.to_string()),
331                    product:                  Some(self.product.to_string()),
332                    ..Default::default()
333                };
334                responder.send(&prop)?;
335            }
336            StreamConfigRequest::GetSupportedFormats { responder } => {
337                let pcm_formats = self.supported_formats.clone();
338                let formats_vector = &[SupportedFormats {
339                    pcm_supported_formats: Some(pcm_formats),
340                    ..Default::default()
341                }];
342                responder.send(formats_vector)?;
343            }
344            StreamConfigRequest::CreateRingBuffer { format, ring_buffer, control_handle: _ } => {
345                let pcm = format.pcm_format.ok_or_else(|| format_err!("No pcm_format included"))?;
346                self.ring_buffer_stream.set(ring_buffer.into_stream());
347                let current = (pcm.frame_rate, pcm.into(), pcm.number_of_channels.into());
348                self.inspect.record_current_format(&current);
349                self.current_format = Some(current);
350                self.delay_info_replied = false;
351            }
352            StreamConfigRequest::WatchGainState { responder } => {
353                if self.gain_state_replied {
354                    // We will never change gain state.
355                    responder.drop_without_shutdown();
356                    return Ok(());
357                }
358                let gain_state = GainState {
359                    muted: Some(false),
360                    agc_enabled: Some(false),
361                    gain_db: Some(0.0f32),
362                    ..Default::default()
363                };
364                responder.send(&gain_state)?;
365                self.gain_state_replied = true
366            }
367            StreamConfigRequest::WatchPlugState { responder } => {
368                if self.plug_state_replied {
369                    // We will never change plug state.
370                    responder.drop_without_shutdown();
371                    return Ok(());
372                }
373                let time = fasync::MonotonicInstant::now();
374                let plug_state = PlugState {
375                    plugged: Some(true),
376                    plug_state_time: Some(time.into_nanos() as i64),
377                    ..Default::default()
378                };
379                responder.send(&plug_state)?;
380                self.plug_state_replied = true;
381            }
382            StreamConfigRequest::SetGain { target_state, control_handle: _ } => {
383                if let Some(true) = target_state.muted {
384                    warn!("Mute is not supported");
385                }
386                if let Some(true) = target_state.agc_enabled {
387                    warn!("AGC is not supported");
388                }
389                if let Some(gain) = target_state.gain_db {
390                    if gain != 0.0 {
391                        warn!("Non-zero gain setting not supported");
392                    }
393                }
394            }
395        }
396        Ok(())
397    }
398
399    fn handle_ring_buffer_request(
400        &mut self,
401        request: RingBufferRequest,
402    ) -> std::result::Result<(), anyhow::Error> {
403        match request {
404            RingBufferRequest::GetProperties { responder } => {
405                let prop = RingBufferProperties {
406                    needs_cache_flush_or_invalidate: Some(false),
407                    // TODO(https://fxbug.dev/42074396): Make driver_transfer_bytes (output) more accurate.
408                    driver_transfer_bytes: Some((self.packet_frames * self.frame_bytes) as u32),
409                    ..Default::default()
410                };
411                responder.send(&prop)?;
412            }
413            RingBufferRequest::GetVmo {
414                min_frames,
415                clock_recovery_notifications_per_ring,
416                responder,
417            } => {
418                let (fps, format, channels) = match &self.current_format {
419                    None => {
420                        if let Err(e) = responder.send(Err(GetVmoError::InternalError)) {
421                            warn!("Error on get vmo error send: {:?}", e);
422                        }
423                        return Ok(());
424                    }
425                    Some(x) => x.clone(),
426                };
427                // Require a minimum amount of frames for three packets.
428                let min_frames_from_duration = 3 * self.packet_frames as u32;
429                let ring_buffer_frames =
430                    (min_frames + self.packet_frames as u32).max(min_frames_from_duration);
431                self.inspect.record_vmo_status("gotten");
432                match self.frame_vmo.lock().set_format(
433                    fps,
434                    format,
435                    channels,
436                    ring_buffer_frames as usize,
437                    clock_recovery_notifications_per_ring,
438                ) {
439                    Err(e) => {
440                        warn!(e:?; "Error on vmo set format");
441                        responder.send(Err(GetVmoError::InternalError))?;
442                    }
443                    Ok(vmo_handle) => {
444                        responder.send(Ok((ring_buffer_frames, vmo_handle)))?;
445                    }
446                }
447            }
448            RingBufferRequest::Start { responder } => {
449                let time = fasync::MonotonicInstant::now();
450                self.inspect.record_vmo_status(&format!("started @ {time:?}"));
451                match self.frame_vmo.lock().start(time.into()) {
452                    Ok(()) => responder.send(time.into_nanos() as i64)?,
453                    Err(e) => {
454                        warn!(e:?; "Error on frame vmo start");
455                        responder.control_handle().shutdown_with_epitaph(zx::Status::BAD_STATE);
456                    }
457                }
458            }
459            RingBufferRequest::Stop { responder } => match self.frame_vmo.lock().stop() {
460                Ok(stopped) => {
461                    if !stopped {
462                        info!("Stopping an unstarted ring buffer");
463                    }
464                    self.inspect.record_vmo_status(&format!(
465                        "stopped @ {:?}",
466                        fasync::MonotonicInstant::now()
467                    ));
468                    responder.send()?;
469                }
470                Err(e) => {
471                    warn!(e:?; "Error on frame vmo stop");
472                    responder.control_handle().shutdown_with_epitaph(zx::Status::BAD_STATE);
473                }
474            },
475            RingBufferRequest::WatchClockRecoveryPositionInfo { responder } => {
476                self.frame_vmo.lock().set_position_responder(responder);
477            }
478            RingBufferRequest::SetActiveChannels { active_channels_bitmask: _, responder } => {
479                responder.send(Err(zx::Status::NOT_SUPPORTED.into_raw()))?;
480            }
481            RingBufferRequest::WatchDelayInfo { responder } => {
482                if self.delay_info_replied {
483                    // We will never change delay state.
484                    // TODO(https://fxbug.dev/42128949): Reply again when the external_delay changes from
485                    // outside instead of just on startup.
486                    responder.drop_without_shutdown();
487                    return Ok(());
488                }
489                // internal_delay is at least our packet duration (we buffer at least that much)
490                // plus whatever delay has been communicated from the client.
491                let delay_info = DelayInfo {
492                    internal_delay: Some(self.current_delay().into_nanos()),
493                    ..Default::default()
494                };
495                responder.send(&delay_info)?;
496                self.delay_info_replied = true;
497            }
498            RingBufferRequest::_UnknownMethod { .. } => (),
499        }
500        Ok(())
501    }
502}
503
504#[cfg(test)]
505pub(crate) mod tests {
506    use super::*;
507
508    use fidl_fuchsia_media::{AudioChannelId, AudioPcmMode, PcmFormat};
509
510    use async_utils::PollExt;
511    use fixture::fixture;
512    use futures::future;
513    use futures::task::Poll;
514
515    const TEST_UNIQUE_ID: &[u8; 16] = &[5; 16];
516    const TEST_CLOCK_DOMAIN: u32 = 0x00010203;
517
518    pub(crate) fn with_audio_frame_stream<F>(_name: &str, test: F)
519    where
520        F: FnOnce(fasync::TestExecutor, StreamConfigProxy, AudioFrameStream) -> (),
521    {
522        let exec = fasync::TestExecutor::new_with_fake_time();
523        let format = PcmFormat {
524            pcm_mode: AudioPcmMode::Linear,
525            bits_per_sample: 16,
526            frames_per_second: 44100,
527            channel_map: vec![AudioChannelId::Lf, AudioChannelId::Rf],
528        };
529        let (client, frame_stream) = SoftStreamConfig::create_output(
530            TEST_UNIQUE_ID,
531            "Google",
532            "UnitTest",
533            TEST_CLOCK_DOMAIN,
534            format,
535            zx::MonotonicDuration::from_millis(100),
536            zx::MonotonicDuration::from_millis(50),
537        )
538        .expect("should always build");
539        test(exec, client.into_proxy(), frame_stream)
540    }
541
542    #[fuchsia::test]
543    fn test_frames_from_duration() {
544        const FPS: usize = 48000;
545        // At 48kHz, each frame is 20833 and 1/3 nanoseconds. We add one nanosecond
546        // because frames need to be completely within the duration.
547        const ONE_FRAME_NANOS: i64 = 20833 + 1;
548        const THREE_FRAME_NANOS: i64 = 20833 * 3 + 1;
549
550        assert_eq!(0, frames_from_duration(FPS, zx::MonotonicDuration::from_nanos(0)));
551
552        assert_eq!(
553            0,
554            frames_from_duration(FPS, zx::MonotonicDuration::from_nanos(ONE_FRAME_NANOS - 1))
555        );
556        assert_eq!(
557            1,
558            frames_from_duration(FPS, zx::MonotonicDuration::from_nanos(ONE_FRAME_NANOS))
559        );
560
561        // Three frames is an exact number of nanoseconds, we should be able to get an exact number
562        // of frames from the duration.
563        assert_eq!(
564            2,
565            frames_from_duration(FPS, zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS - 1))
566        );
567        assert_eq!(
568            3,
569            frames_from_duration(FPS, zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS))
570        );
571        assert_eq!(
572            3,
573            frames_from_duration(FPS, zx::MonotonicDuration::from_nanos(THREE_FRAME_NANOS + 1))
574        );
575
576        assert_eq!(FPS, frames_from_duration(FPS, zx::MonotonicDuration::from_seconds(1)));
577        assert_eq!(72000, frames_from_duration(FPS, zx::MonotonicDuration::from_millis(1500)));
578
579        assert_eq!(10660, frames_from_duration(FPS, zx::MonotonicDuration::from_nanos(222084000)));
580    }
581
582    #[fuchsia::test]
583    fn soft_stream_config_audio_should_end_when_stream_dropped() {
584        let format = PcmFormat {
585            pcm_mode: AudioPcmMode::Linear,
586            bits_per_sample: 16,
587            frames_per_second: 48000,
588            channel_map: vec![AudioChannelId::Lf, AudioChannelId::Rf],
589        };
590
591        let mut exec = fasync::TestExecutor::new_with_fake_time();
592        let (client, frame_stream) = SoftStreamConfig::build(
593            TEST_UNIQUE_ID,
594            &"Google".to_string(),
595            &"UnitTest".to_string(),
596            TEST_CLOCK_DOMAIN,
597            true,
598            format,
599            zx::MonotonicDuration::from_millis(100),
600            zx::MonotonicDuration::from_millis(50),
601        )
602        .expect("should always build");
603
604        drop(frame_stream);
605
606        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future::pending::<()>()));
607
608        // The audio client should be dropped (normally this causes audio to remove the device)
609        assert_eq!(Err(zx::Status::PEER_CLOSED), client.channel().write(&[0], &mut Vec::new()));
610    }
611
612    // Returns the number of frames that were ready in the stream, draining the stream.
613    fn frames_ready(exec: &mut fasync::TestExecutor, frame_stream: &mut AudioFrameStream) -> usize {
614        let mut frames = 0;
615        while exec.run_until_stalled(&mut frame_stream.next()).is_ready() {
616            frames += 1;
617        }
618        frames
619    }
620
621    #[fixture(with_audio_frame_stream)]
622    #[fuchsia::test]
623    fn send_positions(
624        mut exec: fasync::TestExecutor,
625        stream_config: StreamConfigProxy,
626        mut frame_stream: AudioFrameStream,
627    ) {
628        // Poll the frame stream, which should start the processing of proxy requests.
629        assert_eq!(0, frames_ready(&mut exec, &mut frame_stream));
630        let _stream_config_properties = exec.run_until_stalled(&mut stream_config.get_properties());
631        let _formats = exec.run_until_stalled(&mut stream_config.get_supported_formats());
632        let (ring_buffer, server) = fidl::endpoints::create_proxy::<RingBufferMarker>();
633
634        #[rustfmt::skip]
635        let format = Format {
636            pcm_format: Some(fidl_fuchsia_hardware_audio::PcmFormat {
637                number_of_channels:      2u8,
638                sample_format:           SampleFormat::PcmSigned,
639                bytes_per_sample:        2u8,
640                valid_bits_per_sample:   16u8,
641                frame_rate:              44100,
642            }),
643            ..Default::default()
644        };
645
646        let result = stream_config.create_ring_buffer(&format, server);
647        assert!(result.is_ok());
648
649        let _ring_buffer_properties = exec.run_until_stalled(&mut ring_buffer.get_properties());
650
651        let some_active_channels_mask = 0xc3u64;
652        let result =
653            exec.run_until_stalled(&mut ring_buffer.set_active_channels(some_active_channels_mask));
654        assert!(result.is_ready());
655        let _ = match result {
656            Poll::Ready(Ok(Err(e))) => assert_eq!(e, zx::Status::NOT_SUPPORTED.into_raw()),
657            x => panic!("Expected error reply to set_active_channels, got {:?}", x),
658        };
659
660        let clock_recovery_notifications_per_ring = 10u32;
661        let _ = exec.run_until_stalled(
662            &mut ring_buffer.get_vmo(88200, clock_recovery_notifications_per_ring),
663        ); // 2 seconds.
664
665        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(42));
666        let _ = exec.wake_expired_timers();
667        let start_time = exec.run_until_stalled(&mut ring_buffer.start());
668        if let Poll::Ready(s) = start_time {
669            assert_eq!(s.expect("start time error"), 42);
670        } else {
671            panic!("start error");
672        }
673
674        // Watch number 1.
675        let mut position_info = ring_buffer.watch_clock_recovery_position_info();
676        let result = exec.run_until_stalled(&mut position_info);
677        assert!(!result.is_ready());
678
679        // Now advance in between notifications, with a 2 seconds total in the ring buffer
680        // and 10 notifications per ring we can get watch notifications every 200 msecs.
681        exec.set_fake_time(fasync::MonotonicInstant::after(zx::MonotonicDuration::from_millis(
682            201,
683        )));
684        let _ = exec.wake_expired_timers();
685        // Each frame is 100ms, there should be two of them ready now.
686        assert_eq!(2, frames_ready(&mut exec, &mut frame_stream));
687        let result = exec.run_until_stalled(&mut position_info);
688        assert!(result.is_ready());
689
690        // Watch number 2.
691        let mut position_info = ring_buffer.watch_clock_recovery_position_info();
692        let result = exec.run_until_stalled(&mut position_info);
693        assert!(!result.is_ready());
694        exec.set_fake_time(fasync::MonotonicInstant::after(zx::MonotonicDuration::from_millis(
695            201,
696        )));
697        let _ = exec.wake_expired_timers();
698        assert_eq!(2, frames_ready(&mut exec, &mut frame_stream));
699        let result = exec.run_until_stalled(&mut position_info);
700        assert!(result.is_ready());
701
702        // Watch number 3.
703        let mut position_info = ring_buffer.watch_clock_recovery_position_info();
704        let result = exec.run_until_stalled(&mut position_info);
705        assert!(!result.is_ready());
706        exec.set_fake_time(fasync::MonotonicInstant::after(zx::MonotonicDuration::from_millis(
707            201,
708        )));
709        let _ = exec.wake_expired_timers();
710        assert_eq!(2, frames_ready(&mut exec, &mut frame_stream));
711        let result = exec.run_until_stalled(&mut position_info);
712        assert!(result.is_ready());
713
714        let result = exec.run_until_stalled(&mut ring_buffer.stop());
715        assert!(result.is_ready());
716    }
717
718    #[fixture(with_audio_frame_stream)]
719    #[fuchsia::test]
720    fn watch_delay_info(
721        mut exec: fasync::TestExecutor,
722        stream_config: StreamConfigProxy,
723        mut frame_stream: AudioFrameStream,
724    ) {
725        let mut frame_fut = frame_stream.next();
726        // Poll the frame stream, which should start the processing of proxy requests.
727        exec.run_until_stalled(&mut frame_fut).expect_pending("no frames at the start");
728        let _stream_config_properties = exec.run_until_stalled(&mut stream_config.get_properties());
729        let _formats = exec.run_until_stalled(&mut stream_config.get_supported_formats());
730        let (ring_buffer, server) = fidl::endpoints::create_proxy::<RingBufferMarker>();
731
732        #[rustfmt::skip]
733        let format = Format {
734            pcm_format: Some(fidl_fuchsia_hardware_audio::PcmFormat {
735                number_of_channels:      2u8,
736                sample_format:           SampleFormat::PcmSigned,
737                bytes_per_sample:        2u8,
738                valid_bits_per_sample:   16u8,
739                frame_rate:              44100,
740            }),
741            ..Default::default()
742        };
743
744        let result = stream_config.create_ring_buffer(&format, server);
745        assert!(result.is_ok());
746
747        let result = exec.run_until_stalled(&mut ring_buffer.watch_delay_info());
748
749        // Should account for the external_delay here.
750        match result {
751            Poll::Ready(Ok(DelayInfo { internal_delay: Some(x), .. })) => {
752                assert_eq!(zx::MonotonicDuration::from_millis(150).into_nanos(), x)
753            }
754            other => panic!("Expected the correct delay info, got {other:?}"),
755        }
756    }
757}