Skip to main content

fuchsia_audio_codec/
stream_processor.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::{Context as _, Error, format_err};
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_media::*;
8use fidl_fuchsia_mediacodec::*;
9use fidl_fuchsia_sysmem2::*;
10use fuchsia_stream_processors::*;
11use fuchsia_sync::{Mutex, RwLock};
12use futures::future::{MaybeDone, maybe_done};
13use futures::io::{self, AsyncWrite};
14use futures::stream::{FusedStream, Stream};
15use futures::task::{Context, Poll, Waker};
16use futures::{Future, StreamExt, ready};
17use log::{trace, warn};
18use std::collections::{HashSet, VecDeque};
19use std::mem;
20use std::pin::Pin;
21use std::sync::Arc;
22use zx::StatusExt;
23
24use crate::buffer_collection_constraints::buffer_collection_constraints_default;
25use crate::sysmem_allocator::{BufferName, SysmemAllocatedBuffers, SysmemAllocation};
26
27fn fidl_error_to_io_error(e: fidl::Error) -> io::Error {
28    io::Error::other(format_err!("Fidl Error: {}", e))
29}
30
31#[derive(Debug)]
32/// Listener is a three-valued Option that captures the waker that a listener needs to be woken
33/// upon when it polls the future instead of at registration time.
34enum Listener {
35    /// No one is listening.
36    None,
37    /// Someone is listening, but either have been woken and not repolled, or never polled yet.
38    New,
39    /// Someone is listening, and can be woken with the waker.
40    Some(Waker),
41}
42
43impl Listener {
44    /// Adds a waker to be awoken with `Listener::wake`.
45    /// Panics if no one is listening.
46    fn register(&mut self, waker: Waker) {
47        *self = match mem::replace(self, Listener::None) {
48            Listener::None => panic!("Polled a listener with no pollers"),
49            _ => Listener::Some(waker),
50        };
51    }
52
53    /// If a listener has polled, wake the listener and replace it with New.
54    /// Noop if no one has registered.
55    fn wake(&mut self) {
56        if let Listener::None = self {
57            return;
58        }
59        match mem::replace(self, Listener::New) {
60            Listener::None => panic!("Should have been polled"),
61            Listener::Some(waker) => waker.wake(),
62            Listener::New => {}
63        }
64    }
65
66    /// Get a reference to the waker, if there is one waiting.
67    fn waker(&self) -> Option<&Waker> {
68        if let Listener::Some(waker) = self { Some(waker) } else { None }
69    }
70}
71
72impl Default for Listener {
73    fn default() -> Self {
74        Listener::None
75    }
76}
77
78/// A queue of encoded packets, to be sent to the `listener` when it polls next.
79struct OutputQueue {
80    /// The listener. Woken when a packet arrives after a previous poll() returned Pending.
81    listener: Listener,
82    /// A queue of encoded packets to be delivered to the receiver.
83    queue: VecDeque<Packet>,
84    /// True when the stream has received an end-of-stream message. The stream will return None
85    /// after the `queue` is empty.
86    ended: bool,
87}
88
89impl OutputQueue {
90    /// Adds a packet to the queue and wakes the listener if necessary.
91    fn enqueue(&mut self, packet: Packet) {
92        self.queue.push_back(packet);
93        self.listener.wake();
94    }
95
96    /// Signals the end of the stream has happened.
97    /// Wakes the listener if necessary.
98    fn mark_ended(&mut self) {
99        self.ended = true;
100        self.listener.wake();
101    }
102
103    fn waker(&self) -> Option<&Waker> {
104        self.listener.waker()
105    }
106
107    /// Wakes the listener so that it will repoll, if it is waiting.
108    fn wake(&mut self) {
109        self.listener.wake();
110    }
111}
112
113impl Default for OutputQueue {
114    fn default() -> Self {
115        OutputQueue { listener: Listener::default(), queue: VecDeque::new(), ended: false }
116    }
117}
118
119impl Stream for OutputQueue {
120    type Item = Packet;
121
122    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
123        match self.queue.pop_front() {
124            Some(packet) => Poll::Ready(Some(packet)),
125            None if self.ended => Poll::Ready(None),
126            None => {
127                self.listener.register(cx.waker().clone());
128                Poll::Pending
129            }
130        }
131    }
132}
133
134// The minimum specified by codec is too small to contain the typical pcm frame chunk size for the
135// encoder case (1024). Increase to a reasonable amount.
136const MIN_INPUT_BUFFER_SIZE: u32 = 4096;
137// Go with codec default for output, for frame alignment.
138const MIN_OUTPUT_BUFFER_SIZE: u32 = 0;
139
140/// Index of an input buffer to be shared between the client and the StreamProcessor.
141#[derive(PartialEq, Eq, Hash, Clone, Debug)]
142struct InputBufferIndex(u32);
143
144/// The StreamProcessorInner handles the events that come from the StreamProcessor, mostly related
145/// to setup of the buffers and handling the output packets as they arrive.
146struct StreamProcessorInner {
147    /// The proxy to the stream processor.
148    processor: StreamProcessorProxy,
149    /// The proxy to the sysmem allocator.
150    sysmem_client: AllocatorProxy,
151    /// The event stream from the StreamProcessor.  We handle these internally.
152    events: StreamProcessorEventStream,
153    /// The size in bytes of each input packet
154    input_packet_size: u64,
155    /// The set of input buffers that are available for writing by the client, without the one
156    /// possibly being used by the input_cursor.
157    client_owned: HashSet<InputBufferIndex>,
158    /// A cursor on the next input buffer location to be written to when new input data arrives.
159    input_cursor: Option<(InputBufferIndex, u64)>,
160    /// An queue of the indexes of output buffers that have been filled by the processor and a
161    /// waiter if someone is waiting on it.
162    /// Also holds the output waker, if it is registered.
163    output_queue: Mutex<OutputQueue>,
164    /// Waker that is waiting on input to be ready.
165    input_waker: Option<Waker>,
166    /// Allocation for the input buffers.
167    input_allocation: MaybeDone<SysmemAllocation>,
168    /// Allocation for the output buffers.
169    output_allocation: MaybeDone<SysmemAllocation>,
170}
171
172impl StreamProcessorInner {
173    /// Handles an event from the StreamProcessor. A number of these events come on stream start to
174    /// setup the input and output buffers, and from then on the output packets and end of stream
175    /// marker, and the input packets are marked as usable after they are processed.
176    fn handle_event(&mut self, evt: StreamProcessorEvent) -> Result<(), Error> {
177        match evt {
178            StreamProcessorEvent::OnInputConstraints { input_constraints } => {
179                let _input_constraints = ValidStreamBufferConstraints::try_from(input_constraints)?;
180                let buffer_constraints =
181                    Self::buffer_constraints_from_min_size(MIN_INPUT_BUFFER_SIZE);
182                let processor = self.processor.clone();
183                let mut partial_settings = Self::partial_settings();
184                let token_fn = move |token: ClientEnd<BufferCollectionTokenMarker>| {
185                    // A sysmem token channel serves both sysmem(1) and sysmem2 token protocols, so
186                    // we can convert here until StreamProcessor has a sysmem2 token field.
187                    partial_settings.sysmem_token =
188                        Some(ClientEnd::<fidl_fuchsia_sysmem::BufferCollectionTokenMarker>::new(
189                            token.into_channel(),
190                        ));
191                    // FIDL failures will be caught via the request stream.
192                    if let Err(e) = processor.set_input_buffer_partial_settings(partial_settings) {
193                        warn!("Couldn't set input buffer settings: {:?}", e);
194                    }
195                };
196                self.input_allocation = maybe_done(SysmemAllocation::allocate(
197                    self.sysmem_client.clone(),
198                    BufferName { name: "StreamProcessorInput", priority: 1 },
199                    None,
200                    buffer_constraints,
201                    token_fn,
202                )?);
203            }
204            StreamProcessorEvent::OnOutputConstraints { output_config } => {
205                let output_constraints = ValidStreamOutputConstraints::try_from(output_config)?;
206                if !output_constraints.buffer_constraints_action_required {
207                    return Ok(());
208                }
209                let buffer_constraints =
210                    Self::buffer_constraints_from_min_size(MIN_OUTPUT_BUFFER_SIZE);
211                let processor = self.processor.clone();
212                let mut partial_settings = Self::partial_settings();
213                let token_fn = move |token: ClientEnd<BufferCollectionTokenMarker>| {
214                    // A sysmem token channel serves both sysmem(1) and sysmem2 token protocols, so
215                    // we can convert here until StreamProcessor has a sysmem2 token field.
216                    partial_settings.sysmem_token =
217                        Some(ClientEnd::<fidl_fuchsia_sysmem::BufferCollectionTokenMarker>::new(
218                            token.into_channel(),
219                        ));
220                    // FIDL failures will be caught via the request stream.
221                    if let Err(e) = processor.set_output_buffer_partial_settings(partial_settings) {
222                        warn!("Couldn't set output buffer settings: {:?}", e);
223                    }
224                };
225
226                self.output_allocation = maybe_done(SysmemAllocation::allocate(
227                    self.sysmem_client.clone(),
228                    BufferName { name: "StreamProcessorOutput", priority: 1 },
229                    None,
230                    buffer_constraints,
231                    token_fn,
232                )?);
233            }
234            StreamProcessorEvent::OnOutputPacket { output_packet, .. } => {
235                let mut lock = self.output_queue.lock();
236                lock.enqueue(output_packet);
237            }
238            StreamProcessorEvent::OnFreeInputPacket {
239                free_input_packet: PacketHeader { packet_index: Some(idx), .. },
240            } => {
241                if !self.client_owned.insert(InputBufferIndex(idx)) {
242                    warn!("Freed an input packet that was already freed: {:?}", idx);
243                }
244                self.setup_input_cursor();
245            }
246            StreamProcessorEvent::OnOutputEndOfStream { .. } => {
247                let mut lock = self.output_queue.lock();
248                lock.mark_ended();
249            }
250            StreamProcessorEvent::OnOutputFormat { .. } => {}
251            e => trace!("Unhandled stream processor event: {:?}", e),
252        }
253        Ok(())
254    }
255
256    /// Process one event, and return Poll::Ready if the item has been processed,
257    /// and Poll::Pending if no event has been processed and the waker will be woken if
258    /// another event happens.
259    fn process_event(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
260        match ready!(self.events.poll_next_unpin(cx)) {
261            Some(Err(e)) => Poll::Ready(Err(e.into())),
262            Some(Ok(event)) => Poll::Ready(self.handle_event(event)),
263            None => Poll::Ready(Err(format_err!("Client disconnected"))),
264        }
265    }
266
267    fn buffer_constraints_from_min_size(min_buffer_size: u32) -> BufferCollectionConstraints {
268        BufferCollectionConstraints {
269            buffer_memory_constraints: Some(BufferMemoryConstraints {
270                min_size_bytes: Some(min_buffer_size as u64),
271                ..Default::default()
272            }),
273            ..buffer_collection_constraints_default()
274        }
275    }
276
277    fn partial_settings() -> StreamBufferPartialSettings {
278        StreamBufferPartialSettings {
279            buffer_lifetime_ordinal: Some(1),
280            buffer_constraints_version_ordinal: Some(1),
281            sysmem_token: None,
282            ..Default::default()
283        }
284    }
285
286    fn input_buffers(&mut self) -> &mut SysmemAllocatedBuffers {
287        Pin::new(&mut self.input_allocation)
288            .output_mut()
289            .expect("allocation completed")
290            .as_mut()
291            .expect("succcessful allocation")
292    }
293
294    fn output_buffers(&mut self) -> &mut SysmemAllocatedBuffers {
295        Pin::new(&mut self.output_allocation)
296            .output_mut()
297            .expect("allocation completed")
298            .as_mut()
299            .expect("succcessful allocation")
300    }
301
302    /// Called when the input_allocation future finishes.
303    /// Takes the buffers out of the allocator, and sets up the input cursor to accept data.
304    fn input_allocation_complete(&mut self) -> Result<(), Error> {
305        let _ = Pin::new(&mut self.input_allocation)
306            .output_mut()
307            .ok_or_else(|| format_err!("allocation isn't complete"))?;
308
309        let settings = self.input_buffers().settings();
310        self.input_packet_size = (*settings.size_bytes.as_ref().unwrap()).try_into()?;
311        let buffer_count = self.input_buffers().len();
312        for i in 0..buffer_count {
313            let _ = self.client_owned.insert(InputBufferIndex(i.try_into()?));
314        }
315        // allocation is complete, and we can write to the input.
316        self.setup_input_cursor();
317        Ok(())
318    }
319
320    /// Called when the output allocation future finishes.
321    /// Takes the buffers out of the allocator, and sets up the output buffers for retrieval of output,
322    /// signaling to the processor that the output buffers are set.
323    fn output_allocation_complete(&mut self) -> Result<(), Error> {
324        let _ = Pin::new(&mut self.output_allocation)
325            .output_mut()
326            .ok_or_else(|| format_err!("allocation isn't complete"))?;
327        self.processor
328            .complete_output_buffer_partial_settings(/*buffer_lifetime_ordinal=*/ 1)
329            .context("setting output buffer settings")?;
330        Ok(())
331    }
332
333    /// Poll any of the allocations that are waiting to complete, returning Pending if
334    /// any are still waiting to finish, and Ready if one has failed or both have completed.
335    fn poll_buffer_allocation(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
336        if let MaybeDone::Future(_) = self.input_allocation {
337            match Pin::new(&mut self.input_allocation).poll(cx) {
338                Poll::Ready(()) => {
339                    if let Err(e) = self.input_allocation_complete() {
340                        return Poll::Ready(Err(e));
341                    }
342                }
343                Poll::Pending => {}
344            };
345        }
346        if let MaybeDone::Future(_) = self.output_allocation {
347            match Pin::new(&mut self.output_allocation).poll(cx) {
348                Poll::Ready(()) => {
349                    if let Err(e) = self.output_allocation_complete() {
350                        return Poll::Ready(Err(e));
351                    }
352                }
353                Poll::Pending => {}
354            };
355        }
356        Poll::Pending
357    }
358
359    /// Provides the current registered waiting context with priority given to the output waker.
360    fn waiting_waker(&self) -> Option<Waker> {
361        match (self.output_queue.lock().waker(), &self.input_waker) {
362            // No one is waiting.
363            (None, None) => None,
364            (Some(waker), _) => Some(waker.clone()),
365            (_, Some(waker)) => Some(waker.clone()),
366        }
367    }
368
369    /// Process all the events that are currently available from the StreamProcessor and Allocators,
370    /// waking any known waker to be woken when another event arrives.
371    /// Returns Ok(()) if this was accomplished or Err() if an error occurred while processing.
372    fn poll_events(&mut self) -> Result<(), Error> {
373        let waker = loop {
374            let waker = match self.waiting_waker() {
375                // No one still needs to be woken.  This means all the wakers have been awoke,
376                // and will repoll.
377                None => return Ok(()),
378                Some(waker) => waker,
379            };
380            match self.process_event(&mut Context::from_waker(&waker)) {
381                Poll::Pending => break waker,
382                Poll::Ready(Err(e)) => {
383                    warn!("Stream processing error: {:?}", e);
384                    return Err(e.into());
385                }
386                // Didn't set the waker to be awoken, so let's try again.
387                Poll::Ready(Ok(())) => {}
388            }
389        };
390
391        if let Poll::Ready(Err(e)) = self.poll_buffer_allocation(&mut Context::from_waker(&waker)) {
392            warn!("Stream buffer allocation error: {:?}", e);
393            return Err(e.into());
394        }
395        Ok(())
396    }
397
398    fn wake_output(&mut self) {
399        self.output_queue.lock().wake();
400    }
401
402    fn wake_input(&mut self) {
403        if let Some(w) = self.input_waker.take() {
404            w.wake();
405        }
406    }
407
408    /// Attempts to set up a new input cursor, out of the current set of client owned input buffers.
409    /// If the cursor is already set, this does nothing.
410    fn setup_input_cursor(&mut self) {
411        if self.input_cursor.is_some() {
412            // Nothing to be done
413            return;
414        }
415        let next_idx = match self.client_owned.iter().next() {
416            None => return,
417            Some(idx) => idx.clone(),
418        };
419        let _ = self.client_owned.remove(&next_idx);
420        self.input_cursor = Some((next_idx, 0));
421        self.wake_input();
422    }
423
424    /// Reads an output packet from the output buffers, and marks the packets as recycled so the
425    /// output buffer can be reused. Allocates a new vector to hold the data.
426    fn read_output_packet(&mut self, packet: Packet) -> Result<Vec<u8>, Error> {
427        let packet = ValidPacket::try_from(packet)?;
428
429        let output_size = packet.valid_length_bytes as usize;
430        let offset = packet.start_offset as u64;
431        let mut output = vec![0; output_size];
432        let buf_idx = packet.buffer_index;
433        let vmo = self.output_buffers().get_mut(buf_idx).expect("output vmo should exist");
434        vmo.read(&mut output, offset)?;
435        self.processor.recycle_output_packet(&packet.header.into())?;
436        Ok(output)
437    }
438}
439
440/// Struct representing a CodecFactory .
441/// Input sent to the encoder via `StreamProcessor::write_bytes` is queued for delivery, and delivered
442/// whenever a packet is full or `StreamProcessor::send_packet` is called.  Output can be retrieved using
443/// an `StreamProcessorStream` from `StreamProcessor::take_output_stream`.
444pub struct StreamProcessor {
445    inner: Arc<RwLock<StreamProcessorInner>>,
446}
447
448/// An StreamProcessorStream is a Stream of processed data from a stream processor.
449/// Returned from `StreamProcessor::take_output_stream`.
450pub struct StreamProcessorOutputStream {
451    inner: Arc<RwLock<StreamProcessorInner>>,
452}
453
454impl StreamProcessor {
455    /// Create a new StreamProcessor given the proxy.
456    /// Takes the event stream of the proxy.
457    fn create(processor: StreamProcessorProxy, sysmem_client: AllocatorProxy) -> Self {
458        let events = processor.take_event_stream();
459        Self {
460            inner: Arc::new(RwLock::new(StreamProcessorInner {
461                processor,
462                sysmem_client,
463                events,
464                input_packet_size: 0,
465                client_owned: HashSet::new(),
466                input_cursor: None,
467                output_queue: Default::default(),
468                input_waker: None,
469                input_allocation: maybe_done(SysmemAllocation::pending()),
470                output_allocation: maybe_done(SysmemAllocation::pending()),
471            })),
472        }
473    }
474
475    /// Create a new StreamProcessor encoder, with the given `input_domain` and `encoder_settings`.  See
476    /// stream_processor.fidl for descriptions of these parameters.  This is only meant for audio
477    /// encoding.
478    pub fn create_encoder(
479        input_domain: DomainFormat,
480        encoder_settings: EncoderSettings,
481    ) -> Result<StreamProcessor, Error> {
482        let sysmem_client = fuchsia_component::client::connect_to_protocol::<AllocatorMarker>()
483            .context("Connecting to sysmem")?;
484
485        let format_details = FormatDetails {
486            domain: Some(input_domain),
487            encoder_settings: Some(encoder_settings),
488            format_details_version_ordinal: Some(1),
489            mime_type: Some("audio/pcm".to_string()),
490            oob_bytes: None,
491            pass_through_parameters: None,
492            timebase: None,
493            ..Default::default()
494        };
495
496        let encoder_params = CreateEncoderParams {
497            input_details: Some(format_details),
498            require_hw: Some(false),
499            ..Default::default()
500        };
501
502        let codec_svc = fuchsia_component::client::connect_to_protocol::<CodecFactoryMarker>()
503            .context("Failed to connect to Codec Factory")?;
504
505        let (processor, stream_processor_serverend) = fidl::endpoints::create_proxy();
506
507        codec_svc.create_encoder(&encoder_params, stream_processor_serverend)?;
508
509        Ok(StreamProcessor::create(processor, sysmem_client))
510    }
511
512    /// Create a new StreamProcessor decoder, with the given `mime_type` and optional `oob_bytes`.  See
513    /// stream_processor.fidl for descriptions of these parameters.  This is only meant for audio
514    /// decoding.
515    pub fn create_decoder(
516        mime_type: &str,
517        oob_bytes: Option<Vec<u8>>,
518    ) -> Result<StreamProcessor, Error> {
519        let sysmem_client = fuchsia_component::client::connect_to_protocol::<AllocatorMarker>()
520            .context("Connecting to sysmem")?;
521
522        let format_details = FormatDetails {
523            mime_type: Some(mime_type.to_string()),
524            oob_bytes: oob_bytes,
525            format_details_version_ordinal: Some(1),
526            encoder_settings: None,
527            domain: None,
528            pass_through_parameters: None,
529            timebase: None,
530            ..Default::default()
531        };
532
533        let decoder_params = CreateDecoderParams {
534            input_details: Some(format_details),
535            permit_lack_of_split_header_handling: Some(true),
536            ..Default::default()
537        };
538
539        let codec_svc = fuchsia_component::client::connect_to_protocol::<CodecFactoryMarker>()
540            .context("Failed to connect to Codec Factory")?;
541
542        let (processor, stream_processor_serverend) = fidl::endpoints::create_proxy();
543
544        codec_svc.create_decoder(&decoder_params, stream_processor_serverend)?;
545
546        Ok(StreamProcessor::create(processor, sysmem_client))
547    }
548
549    /// Take a stream object which will produce the output of the processor.
550    /// Only one StreamProcessorOutputStream object can exist at a time, and this will return an Error if it is
551    /// already taken.
552    pub fn take_output_stream(&mut self) -> Result<StreamProcessorOutputStream, Error> {
553        {
554            let read = self.inner.read();
555            let mut lock = read.output_queue.lock();
556            if let Listener::None = lock.listener {
557                lock.listener = Listener::New;
558            } else {
559                return Err(format_err!("Output stream already taken"));
560            }
561        }
562        Ok(StreamProcessorOutputStream { inner: self.inner.clone() })
563    }
564
565    /// Deliver input to the stream processor.  Returns the number of bytes delivered.
566    fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize, io::Error> {
567        let mut bytes_idx = 0;
568        while bytes.len() > bytes_idx {
569            {
570                let mut write = self.inner.write();
571                let (idx, size) = match write.input_cursor.take() {
572                    None => return Ok(bytes_idx),
573                    Some(x) => x,
574                };
575                let space_left = write.input_packet_size - size;
576                let left_to_write = bytes.len() - bytes_idx;
577                let buffer_vmo = write.input_buffers().get_mut(idx.0).expect("need buffer vmo");
578                if space_left as usize > left_to_write {
579                    let write_buf = &bytes[bytes_idx..];
580                    let write_len = write_buf.len();
581                    buffer_vmo.write(write_buf, size).map_err(|s| s.into_io_error())?;
582                    bytes_idx += write_len;
583                    write.input_cursor = Some((idx, size + write_len as u64));
584                    assert!(bytes.len() == bytes_idx);
585                    return Ok(bytes_idx);
586                }
587                let end_idx = bytes_idx + space_left as usize;
588                let write_buf = &bytes[bytes_idx..end_idx];
589                let write_len = write_buf.len();
590                buffer_vmo.write(write_buf, size).map_err(|s| s.into_io_error())?;
591                bytes_idx += write_len;
592                // this buffer is done, ship it!
593                assert_eq!(size + write_len as u64, write.input_packet_size);
594                write.input_cursor = Some((idx, write.input_packet_size));
595            }
596            self.send_packet()?;
597        }
598        Ok(bytes_idx)
599    }
600
601    /// Flush the input buffer to the processor, relinquishing the ownership of the buffer
602    /// currently in the input cursor, and picking a new input buffer.  If there is no input
603    /// buffer left, the input cursor is left as None.
604    pub fn send_packet(&mut self) -> Result<(), io::Error> {
605        let mut write = self.inner.write();
606        if write.input_cursor.is_none() {
607            // Nothing to flush, nothing can have been written to an empty input cursor.
608            return Ok(());
609        }
610        let (idx, size) = write.input_cursor.take().expect("input cursor is none");
611        if size == 0 {
612            // Can't send empty packet to processor.
613            write.input_cursor = Some((idx, size));
614            return Ok(());
615        }
616        let packet = Packet {
617            header: Some(PacketHeader {
618                buffer_lifetime_ordinal: Some(1),
619                packet_index: Some(idx.0),
620                ..Default::default()
621            }),
622            buffer_index: Some(idx.0),
623            stream_lifetime_ordinal: Some(1),
624            start_offset: Some(0),
625            valid_length_bytes: Some(size as u32),
626            start_access_unit: Some(true),
627            known_end_access_unit: Some(true),
628            ..Default::default()
629        };
630        write.processor.queue_input_packet(&packet).map_err(fidl_error_to_io_error)?;
631        // pick another buffer for the input cursor
632        write.setup_input_cursor();
633        Ok(())
634    }
635
636    /// Test whether it is possible to write to the StreamProcessor. If there are no input buffers
637    /// available, returns Poll::Pending and arranges for the input task to receive a
638    /// notification when an input buffer may be available or the encoder is closed.
639    fn poll_writable(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
640        let mut write = self.inner.write();
641        // Drop the current input waker, since we have a new one.
642        // If the output waker is set, it should already be queued to be woken for the codec.
643        write.input_waker = None;
644        if write.input_cursor.is_some() {
645            return Poll::Ready(Ok(()));
646        }
647        write.input_waker = Some(cx.waker().clone());
648        // This can:
649        //  - wake the input waker (somehow received a input packet)
650        //  - poll with the output waker, setting it up to be woken
651        //  - poll with the input waker to be woken
652        if let Err(e) = write.poll_events() {
653            return Poll::Ready(Err(io::Error::other(e)));
654        }
655        Poll::Pending
656    }
657
658    pub fn close(&mut self) -> Result<(), io::Error> {
659        self.send_packet()?;
660
661        let mut write = self.inner.write();
662
663        write.processor.queue_input_end_of_stream(1).map_err(fidl_error_to_io_error)?;
664        // TODO: indicate this another way so that we can send an error if someone tries to write
665        // it after it's closed.
666        write.input_cursor = None;
667        write.wake_input();
668        write.wake_output();
669        Ok(())
670    }
671}
672
673impl AsyncWrite for StreamProcessor {
674    fn poll_write(
675        mut self: Pin<&mut Self>,
676        cx: &mut Context<'_>,
677        buf: &[u8],
678    ) -> Poll<io::Result<usize>> {
679        ready!(self.poll_writable(cx))?;
680        match self.write_bytes(buf) {
681            Ok(written) => Poll::Ready(Ok(written)),
682            Err(e) => Poll::Ready(Err(e.into())),
683        }
684    }
685
686    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
687        Poll::Ready(self.send_packet())
688    }
689
690    fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
691        Poll::Ready(self.send_packet())
692    }
693}
694
695impl Stream for StreamProcessorOutputStream {
696    type Item = Result<Vec<u8>, Error>;
697
698    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
699        let mut write = self.inner.write();
700        // If we have a item ready, just return it.
701        let packet = {
702            let mut queue = write.output_queue.lock();
703            match queue.poll_next_unpin(cx) {
704                Poll::Ready(Some(packet)) => Some(Some(packet)),
705                Poll::Ready(None) => Some(None),
706                Poll::Pending => {
707                    // The waker has been set for when the queue gets data.
708                    // We also need to set the same waker if an event happens.
709                    None
710                }
711            }
712        };
713        // We always need to set a waker for the events loop (this may be the same waker as above,
714        // or the input waker if the stream returned a packet)
715        if let Err(e) = write.poll_events() {
716            return Poll::Ready(Some(Err(e.into())));
717        }
718        match packet {
719            Some(Some(packet)) => Poll::Ready(Some(write.read_output_packet(packet))),
720            Some(None) => Poll::Ready(None),
721            None => Poll::Pending,
722        }
723    }
724}
725
726impl FusedStream for StreamProcessorOutputStream {
727    fn is_terminated(&self) -> bool {
728        self.inner.read().output_queue.lock().ended
729    }
730}
731
732#[cfg(test)]
733mod tests {
734    use super::*;
735
736    use async_test_helpers::run_while;
737    use byteorder::{ByteOrder, NativeEndian};
738    use fixture::fixture;
739    use fuchsia_async as fasync;
740    use futures::FutureExt;
741    use futures::io::AsyncWriteExt;
742    use futures_test::task::new_count_waker;
743    use sha2::{Digest as _, Sha256};
744    use std::fs::File;
745    use std::io::{Read, Write};
746    use std::pin::pin;
747
748    use stream_processor_test::ExpectedDigest;
749
750    const PCM_SAMPLE_SIZE: usize = 2;
751
752    #[derive(Clone, Debug)]
753    pub struct PcmAudio {
754        pcm_format: PcmFormat,
755        buffer: Vec<u8>,
756    }
757
758    impl PcmAudio {
759        pub fn create_saw_wave(pcm_format: PcmFormat, frame_count: usize) -> Self {
760            const FREQUENCY: f32 = 20.0;
761            const AMPLITUDE: f32 = 0.2;
762
763            let pcm_frame_size = PCM_SAMPLE_SIZE * pcm_format.channel_map.len();
764            let samples_per_frame = pcm_format.channel_map.len();
765            let sample_count = frame_count * samples_per_frame;
766
767            let mut buffer = vec![0; frame_count * pcm_frame_size];
768
769            for i in 0..sample_count {
770                let frame = (i / samples_per_frame) as f32;
771                let value =
772                    ((frame * FREQUENCY / (pcm_format.frames_per_second as f32)) % 1.0) * AMPLITUDE;
773                let sample = (value * i16::max_value() as f32) as i16;
774
775                let mut sample_bytes = [0; std::mem::size_of::<i16>()];
776                NativeEndian::write_i16(&mut sample_bytes, sample);
777
778                let offset = i * PCM_SAMPLE_SIZE;
779                buffer[offset] = sample_bytes[0];
780                buffer[offset + 1] = sample_bytes[1];
781            }
782
783            Self { pcm_format, buffer }
784        }
785
786        pub fn frame_size(&self) -> usize {
787            self.pcm_format.channel_map.len() * PCM_SAMPLE_SIZE
788        }
789    }
790
791    // Note: stolen from audio_encoder_test, update to stream_processor_test lib when this gets
792    // moved.
793    pub struct BytesValidator {
794        pub output_file: Option<&'static str>,
795        pub expected_digest: ExpectedDigest,
796    }
797
798    impl BytesValidator {
799        fn write_and_hash(&self, mut file: impl Write, bytes: &[u8]) -> Result<(), Error> {
800            let mut hasher = Sha256::default();
801
802            file.write_all(&bytes)?;
803            hasher.update(&bytes);
804
805            let digest: [u8; 32] = hasher.finalize().into();
806            if self.expected_digest.bytes != digest {
807                return Err(format_err!(
808                    "Expected {}; got {}",
809                    self.expected_digest,
810                    hex::encode(digest)
811                ))
812                .into();
813            }
814
815            Ok(())
816        }
817
818        fn output_file(&self) -> Result<impl Write, Error> {
819            Ok(if let Some(file) = self.output_file {
820                Box::new(std::fs::File::create(file)?) as Box<dyn Write>
821            } else {
822                Box::new(std::io::sink()) as Box<dyn Write>
823            })
824        }
825
826        fn validate(&self, bytes: &[u8]) -> Result<(), Error> {
827            self.write_and_hash(self.output_file()?, &bytes)
828        }
829    }
830
831    #[fuchsia::test]
832    fn encode_sbc() {
833        let mut exec = fasync::TestExecutor::new();
834
835        let pcm_format = PcmFormat {
836            pcm_mode: AudioPcmMode::Linear,
837            bits_per_sample: 16,
838            frames_per_second: 44100,
839            channel_map: vec![AudioChannelId::Cf],
840        };
841
842        let sub_bands = SbcSubBands::SubBands4;
843        let block_count = SbcBlockCount::BlockCount8;
844
845        let input_frames = 3000;
846
847        let pcm_audio = PcmAudio::create_saw_wave(pcm_format.clone(), input_frames);
848
849        let sbc_encoder_settings = EncoderSettings::Sbc(SbcEncoderSettings {
850            sub_bands,
851            block_count,
852            allocation: SbcAllocation::AllocLoudness,
853            channel_mode: SbcChannelMode::Mono,
854            bit_pool: 59, // Recommended from the SBC spec for these parameters.
855        });
856
857        let input_domain = DomainFormat::Audio(AudioFormat::Uncompressed(
858            AudioUncompressedFormat::Pcm(pcm_format),
859        ));
860
861        let mut encoder = StreamProcessor::create_encoder(input_domain, sbc_encoder_settings)
862            .expect("to create Encoder");
863
864        let frames_per_packet: usize = 8; // Randomly chosen by fair d10 roll.
865        let packet_size = pcm_audio.frame_size() * frames_per_packet;
866        let mut packets = pcm_audio.buffer.as_slice().chunks(packet_size);
867        let first_packet = packets.next().unwrap();
868
869        // Write an initial frame to the encoder.
870        // This is required to get past allocating the input/output buffers.
871        let written =
872            exec.run_singlethreaded(&mut encoder.write(first_packet)).expect("successful write");
873        assert_eq!(written, first_packet.len());
874
875        let mut encoded_stream = encoder.take_output_stream().expect("Stream should be taken");
876
877        // Shouldn't be able to take the stream twice
878        assert!(encoder.take_output_stream().is_err());
879
880        // Polling the encoded stream before the encoder has started up should wake it when
881        // output starts happening, set up the poll here.
882        let encoded_fut = pin!(encoded_stream.next());
883
884        let (waker, encoder_fut_wake_count) = new_count_waker();
885        let mut counting_ctx = Context::from_waker(&waker);
886
887        assert!(encoded_fut.poll(&mut counting_ctx).is_pending());
888
889        let mut frames_sent = first_packet.len() / pcm_audio.frame_size();
890
891        for packet in packets {
892            let mut written_fut = encoder.write(&packet);
893
894            let written_bytes =
895                exec.run_singlethreaded(&mut written_fut).expect("to write to encoder");
896
897            assert_eq!(packet.len(), written_bytes);
898            frames_sent += packet.len() / pcm_audio.frame_size();
899        }
900
901        encoder.close().expect("stream should always be closable");
902
903        assert_eq!(input_frames, frames_sent);
904
905        // When an unprocessed event has happened on the stream, even if intervening events have been
906        // procesed by the input processes, it should wake the output future to process the events.
907        let woke_count = encoder_fut_wake_count.get();
908        while encoder_fut_wake_count.get() == woke_count {
909            let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
910        }
911        assert_eq!(encoder_fut_wake_count.get(), woke_count + 1);
912
913        // Get data from the output now.
914        let mut encoded = Vec::new();
915
916        loop {
917            let mut encoded_fut = encoded_stream.next();
918
919            match exec.run_singlethreaded(&mut encoded_fut) {
920                Some(Ok(enc_data)) => {
921                    assert!(!enc_data.is_empty());
922                    encoded.extend_from_slice(&enc_data);
923                }
924                Some(Err(e)) => {
925                    panic!("Unexpected error when polling encoded data: {}", e);
926                }
927                None => {
928                    break;
929                }
930            }
931        }
932
933        // Match the encoded data to the known hash.
934        let expected_digest = ExpectedDigest::new(
935            "Sbc: 44.1kHz/Loudness/Mono/bitpool 56/blocks 8/subbands 4",
936            "5c65a88bda3f132538966d87df34aa8675f85c9892b7f9f5571f76f3c7813562",
937        );
938        let hash_validator = BytesValidator { output_file: None, expected_digest };
939
940        assert_eq!(6110, encoded.len(), "Encoded size should be equal");
941
942        let validated = hash_validator.validate(encoded.as_slice());
943        assert!(validated.is_ok(), "Failed hash: {:?}", validated);
944    }
945
946    fn fix_sbc_test_file<F>(_name: &str, test: F)
947    where
948        F: FnOnce(Vec<u8>) -> (),
949    {
950        const SBC_TEST_FILE: &str = "/pkg/data/s16le44100mono.sbc";
951
952        let mut sbc_data = Vec::new();
953        let _ = File::open(SBC_TEST_FILE)
954            .expect("open test file")
955            .read_to_end(&mut sbc_data)
956            .expect("read test file");
957
958        test(sbc_data)
959    }
960
961    #[fixture(fix_sbc_test_file)]
962    #[fuchsia::test]
963    fn decode_sbc(sbc_data: Vec<u8>) {
964        let mut exec = fasync::TestExecutor::new();
965
966        const SBC_FRAME_SIZE: usize = 72;
967        const INPUT_FRAMES: usize = 23;
968
969        // SBC codec info corresponding to Mono reference stream.
970        let oob_data = Some([0x82, 0x00, 0x00, 0x00].to_vec());
971        let mut decoder =
972            StreamProcessor::create_decoder("audio/sbc", oob_data).expect("to create decoder");
973
974        let mut decoded_stream = decoder.take_output_stream().expect("Stream should be taken");
975
976        // Shouldn't be able to take the stream twice
977        assert!(decoder.take_output_stream().is_err());
978
979        let mut frames_sent = 0;
980
981        let frames_per_packet: usize = 1; // Randomly chosen by fair d10 roll.
982        let packet_size = SBC_FRAME_SIZE * frames_per_packet;
983
984        for frames in sbc_data.as_slice().chunks(packet_size) {
985            let mut written_fut = decoder.write(&frames);
986
987            let written_bytes =
988                exec.run_singlethreaded(&mut written_fut).expect("to write to decoder");
989
990            assert_eq!(frames.len(), written_bytes);
991            frames_sent += frames.len() / SBC_FRAME_SIZE;
992        }
993
994        assert_eq!(INPUT_FRAMES, frames_sent);
995
996        let mut flush_fut = pin!(decoder.flush());
997        exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
998
999        decoder.close().expect("stream should always be closable");
1000
1001        // Get data from the output now.
1002        let mut decoded = Vec::new();
1003
1004        loop {
1005            let mut decoded_fut = decoded_stream.next();
1006
1007            match exec.run_singlethreaded(&mut decoded_fut) {
1008                Some(Ok(dec_data)) => {
1009                    assert!(!dec_data.is_empty());
1010                    decoded.extend_from_slice(&dec_data);
1011                }
1012                Some(Err(e)) => {
1013                    panic!("Unexpected error when polling decoded data: {}", e);
1014                }
1015                None => {
1016                    break;
1017                }
1018            }
1019        }
1020
1021        // Match the decoded data to the known hash.
1022        let expected_digest = ExpectedDigest::new(
1023            "Pcm: 44.1kHz/16bit/Mono",
1024            "ff2e7afea51217886d3df15b9a623b4e49c9bd9bd79c58ac01bc94c5511e08d6",
1025        );
1026        let hash_validator = BytesValidator { output_file: None, expected_digest };
1027
1028        assert_eq!(256 * INPUT_FRAMES, decoded.len(), "Decoded size should be equal");
1029
1030        let validated = hash_validator.validate(decoded.as_slice());
1031        assert!(validated.is_ok(), "Failed hash: {:?}", validated);
1032    }
1033
1034    #[fixture(fix_sbc_test_file)]
1035    #[fuchsia::test]
1036    fn decode_sbc_wakes_output_to_process_events(sbc_data: Vec<u8>) {
1037        let mut exec = fasync::TestExecutor::new();
1038        const SBC_FRAME_SIZE: usize = 72;
1039
1040        // SBC codec info corresponding to Mono reference stream.
1041        let oob_data = Some([0x82, 0x00, 0x00, 0x00].to_vec());
1042        let mut decoder =
1043            StreamProcessor::create_decoder("audio/sbc", oob_data).expect("to create decoder");
1044
1045        let mut chunks = sbc_data.as_slice().chunks(SBC_FRAME_SIZE);
1046        let next_frame = chunks.next().unwrap();
1047
1048        // Write an initial frame to the encoder.
1049        // This is required to get past allocating the input/output buffers.
1050        let written =
1051            exec.run_singlethreaded(&mut decoder.write(next_frame)).expect("successful write");
1052        assert_eq!(written, next_frame.len());
1053
1054        let mut decoded_stream = decoder.take_output_stream().expect("Stream should be taken");
1055
1056        // Polling the decoded stream before the decoder has started up should wake it when
1057        // output starts happening, set up the poll here.
1058        let decoded_fut = pin!(decoded_stream.next());
1059
1060        let (waker, decoder_fut_wake_count) = new_count_waker();
1061        let mut counting_ctx = Context::from_waker(&waker);
1062
1063        assert!(decoded_fut.poll(&mut counting_ctx).is_pending());
1064
1065        // Send only one frame. This is not eneough to automatically cause output to be generated
1066        // by pushing data.
1067        let frame = chunks.next().unwrap();
1068        let mut written_fut = decoder.write(&frame);
1069        let written_bytes = exec.run_singlethreaded(&mut written_fut).expect("to write to decoder");
1070        assert_eq!(frame.len(), written_bytes);
1071
1072        let mut flush_fut = pin!(decoder.flush());
1073        exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
1074
1075        // When an unprocessed event has happened on the stream, even if intervening events have been
1076        // procesed by the input processes, it should wake the output future to process the events.
1077        assert_eq!(decoder_fut_wake_count.get(), 0);
1078        while decoder_fut_wake_count.get() == 0 {
1079            let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
1080        }
1081        assert_eq!(decoder_fut_wake_count.get(), 1);
1082
1083        let mut decoded = Vec::new();
1084        // Drops the previous decoder future, which is fine.
1085        let mut decoded_fut = decoded_stream.next();
1086
1087        match exec.run_singlethreaded(&mut decoded_fut) {
1088            Some(Ok(dec_data)) => {
1089                assert!(!dec_data.is_empty());
1090                decoded.extend_from_slice(&dec_data);
1091            }
1092            x => panic!("Expected decoded frame, got {:?}", x),
1093        }
1094
1095        assert_eq!(512, decoded.len(), "Decoded size should be equal to one frame");
1096    }
1097
1098    #[fixture(fix_sbc_test_file)]
1099    #[fuchsia::test]
1100    fn decode_sbc_wakes_input_to_process_events(sbc_data: Vec<u8>) {
1101        let mut exec = fasync::TestExecutor::new();
1102        const SBC_FRAME_SIZE: usize = 72;
1103
1104        // SBC codec info corresponding to Mono reference stream.
1105        let oob_data = Some([0x82, 0x00, 0x00, 0x00].to_vec());
1106        let mut decoder =
1107            StreamProcessor::create_decoder("audio/sbc", oob_data).expect("to create decoder");
1108
1109        let mut decoded_stream = decoder.take_output_stream().expect("Stream should be taken");
1110
1111        let decoded_fut = pin!(decoded_stream.next());
1112
1113        let mut chunks = sbc_data.as_slice().chunks(SBC_FRAME_SIZE).cycle();
1114        let next_frame = chunks.next().unwrap();
1115
1116        // Write an initial frame to the encoder.
1117        // This is to get past allocating the input/output buffers stage.
1118        // TODO(https://fxbug.dev/42081385): Both futures need to be polled here even though it's only the
1119        // writer we really care about because currently decoded_fut is needed to drive the
1120        // allocation process.
1121        let (written_res, mut decoded_fut) =
1122            run_while(&mut exec, decoded_fut, decoder.write(next_frame));
1123        assert_eq!(written_res.expect("initial write should succeed"), next_frame.len());
1124
1125        // Write to the encoder until we cannot write anymore, because there are no input buffers
1126        // available.  This should happen when all the input buffers are full and and the input
1127        // buffers are waiting to be written.
1128        let (waker, write_fut_wake_count) = new_count_waker();
1129        let mut counting_ctx = Context::from_waker(&waker);
1130
1131        let mut wake_count_before_stall = 0;
1132        for frame in chunks {
1133            wake_count_before_stall = write_fut_wake_count.get();
1134            let mut written_fut = decoder.write(&frame);
1135            if written_fut.poll_unpin(&mut counting_ctx).is_pending() {
1136                // The poll_unpin can wake the input waker if an event arrived for it, meaning we should
1137                // continue filling.
1138                if write_fut_wake_count.get() != wake_count_before_stall {
1139                    continue;
1140                }
1141                // We should have never been woken until now, because we always were ready before,
1142                // and the output waker is not registered (so can't progress)
1143                break;
1144            }
1145            // Flush the packet, to make input buffers get spent faster.
1146            let mut flush_fut = pin!(decoder.flush());
1147            exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
1148        }
1149
1150        // We should be able to get a decoded output, once the codec does it's thing.
1151        let decoded_frame = exec.run_singlethreaded(&mut decoded_fut);
1152        assert_eq!(512, decoded_frame.unwrap().unwrap().len(), "Decoded frame size wrong");
1153
1154        // Fill the input buffer again so the input waker is registered.
1155        let chunks = sbc_data.as_slice().chunks(SBC_FRAME_SIZE).cycle();
1156        for frame in chunks {
1157            wake_count_before_stall = write_fut_wake_count.get();
1158            let mut written_fut = decoder.write(&frame);
1159            if written_fut.poll_unpin(&mut counting_ctx).is_pending() {
1160                // The poll_unpin can wake the input waker if an event arrived for it, meaning we should
1161                // continue filling.
1162                if write_fut_wake_count.get() != wake_count_before_stall {
1163                    continue;
1164                }
1165                break;
1166            }
1167            // Flush the packet, to make input buffers get spent faster.
1168            let mut flush_fut = pin!(decoder.flush());
1169            exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
1170        }
1171
1172        // The input waker should be the one waiting on events from the codec and get woken up,
1173        // even if an output event happens.
1174        // At some point, we will get an event from the encoder, with no output waker set, and this
1175        // should wake the input waker, which is waiting to be woken up.
1176        while write_fut_wake_count.get() == wake_count_before_stall {
1177            let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
1178        }
1179
1180        // Note: at this point, we may not be able to write another frame, but the waiter should
1181        // repoll, and set the waker again.
1182    }
1183}