fuchsia_audio_codec/
stream_processor.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::{Context as _, Error, format_err};
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_media::*;
8use fidl_fuchsia_mediacodec::*;
9use fidl_fuchsia_sysmem2::*;
10use fuchsia_stream_processors::*;
11use fuchsia_sync::{Mutex, RwLock};
12use futures::future::{MaybeDone, maybe_done};
13use futures::io::{self, AsyncWrite};
14use futures::stream::{FusedStream, Stream};
15use futures::task::{Context, Poll, Waker};
16use futures::{Future, StreamExt, ready};
17use log::{trace, warn};
18use std::collections::{HashSet, VecDeque};
19use std::mem;
20use std::pin::Pin;
21use std::sync::Arc;
22
23use crate::buffer_collection_constraints::buffer_collection_constraints_default;
24use crate::sysmem_allocator::{BufferName, SysmemAllocatedBuffers, SysmemAllocation};
25
26fn fidl_error_to_io_error(e: fidl::Error) -> io::Error {
27    io::Error::other(format_err!("Fidl Error: {}", e))
28}
29
30#[derive(Debug)]
31/// Listener is a three-valued Option that captures the waker that a listener needs to be woken
32/// upon when it polls the future instead of at registration time.
33enum Listener {
34    /// No one is listening.
35    None,
36    /// Someone is listening, but either have been woken and not repolled, or never polled yet.
37    New,
38    /// Someone is listening, and can be woken with the waker.
39    Some(Waker),
40}
41
42impl Listener {
43    /// Adds a waker to be awoken with `Listener::wake`.
44    /// Panics if no one is listening.
45    fn register(&mut self, waker: Waker) {
46        *self = match mem::replace(self, Listener::None) {
47            Listener::None => panic!("Polled a listener with no pollers"),
48            _ => Listener::Some(waker),
49        };
50    }
51
52    /// If a listener has polled, wake the listener and replace it with New.
53    /// Noop if no one has registered.
54    fn wake(&mut self) {
55        if let Listener::None = self {
56            return;
57        }
58        match mem::replace(self, Listener::New) {
59            Listener::None => panic!("Should have been polled"),
60            Listener::Some(waker) => waker.wake(),
61            Listener::New => {}
62        }
63    }
64
65    /// Get a reference to the waker, if there is one waiting.
66    fn waker(&self) -> Option<&Waker> {
67        if let Listener::Some(waker) = self { Some(waker) } else { None }
68    }
69}
70
71impl Default for Listener {
72    fn default() -> Self {
73        Listener::None
74    }
75}
76
77/// A queue of encoded packets, to be sent to the `listener` when it polls next.
78struct OutputQueue {
79    /// The listener. Woken when a packet arrives after a previous poll() returned Pending.
80    listener: Listener,
81    /// A queue of encoded packets to be delivered to the receiver.
82    queue: VecDeque<Packet>,
83    /// True when the stream has received an end-of-stream message. The stream will return None
84    /// after the `queue` is empty.
85    ended: bool,
86}
87
88impl OutputQueue {
89    /// Adds a packet to the queue and wakes the listener if necessary.
90    fn enqueue(&mut self, packet: Packet) {
91        self.queue.push_back(packet);
92        self.listener.wake();
93    }
94
95    /// Signals the end of the stream has happened.
96    /// Wakes the listener if necessary.
97    fn mark_ended(&mut self) {
98        self.ended = true;
99        self.listener.wake();
100    }
101
102    fn waker(&self) -> Option<&Waker> {
103        self.listener.waker()
104    }
105
106    /// Wakes the listener so that it will repoll, if it is waiting.
107    fn wake(&mut self) {
108        self.listener.wake();
109    }
110}
111
112impl Default for OutputQueue {
113    fn default() -> Self {
114        OutputQueue { listener: Listener::default(), queue: VecDeque::new(), ended: false }
115    }
116}
117
118impl Stream for OutputQueue {
119    type Item = Packet;
120
121    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
122        match self.queue.pop_front() {
123            Some(packet) => Poll::Ready(Some(packet)),
124            None if self.ended => Poll::Ready(None),
125            None => {
126                self.listener.register(cx.waker().clone());
127                Poll::Pending
128            }
129        }
130    }
131}
132
133// The minimum specified by codec is too small to contain the typical pcm frame chunk size for the
134// encoder case (1024). Increase to a reasonable amount.
135const MIN_INPUT_BUFFER_SIZE: u32 = 4096;
136// Go with codec default for output, for frame alignment.
137const MIN_OUTPUT_BUFFER_SIZE: u32 = 0;
138
139/// Index of an input buffer to be shared between the client and the StreamProcessor.
140#[derive(PartialEq, Eq, Hash, Clone, Debug)]
141struct InputBufferIndex(u32);
142
143/// The StreamProcessorInner handles the events that come from the StreamProcessor, mostly related
144/// to setup of the buffers and handling the output packets as they arrive.
145struct StreamProcessorInner {
146    /// The proxy to the stream processor.
147    processor: StreamProcessorProxy,
148    /// The proxy to the sysmem allocator.
149    sysmem_client: AllocatorProxy,
150    /// The event stream from the StreamProcessor.  We handle these internally.
151    events: StreamProcessorEventStream,
152    /// The size in bytes of each input packet
153    input_packet_size: u64,
154    /// The set of input buffers that are available for writing by the client, without the one
155    /// possibly being used by the input_cursor.
156    client_owned: HashSet<InputBufferIndex>,
157    /// A cursor on the next input buffer location to be written to when new input data arrives.
158    input_cursor: Option<(InputBufferIndex, u64)>,
159    /// An queue of the indexes of output buffers that have been filled by the processor and a
160    /// waiter if someone is waiting on it.
161    /// Also holds the output waker, if it is registered.
162    output_queue: Mutex<OutputQueue>,
163    /// Waker that is waiting on input to be ready.
164    input_waker: Option<Waker>,
165    /// Allocation for the input buffers.
166    input_allocation: MaybeDone<SysmemAllocation>,
167    /// Allocation for the output buffers.
168    output_allocation: MaybeDone<SysmemAllocation>,
169}
170
171impl StreamProcessorInner {
172    /// Handles an event from the StreamProcessor. A number of these events come on stream start to
173    /// setup the input and output buffers, and from then on the output packets and end of stream
174    /// marker, and the input packets are marked as usable after they are processed.
175    fn handle_event(&mut self, evt: StreamProcessorEvent) -> Result<(), Error> {
176        match evt {
177            StreamProcessorEvent::OnInputConstraints { input_constraints } => {
178                let _input_constraints = ValidStreamBufferConstraints::try_from(input_constraints)?;
179                let buffer_constraints =
180                    Self::buffer_constraints_from_min_size(MIN_INPUT_BUFFER_SIZE);
181                let processor = self.processor.clone();
182                let mut partial_settings = Self::partial_settings();
183                let token_fn = move |token: ClientEnd<BufferCollectionTokenMarker>| {
184                    // A sysmem token channel serves both sysmem(1) and sysmem2 token protocols, so
185                    // we can convert here until StreamProcessor has a sysmem2 token field.
186                    partial_settings.sysmem_token =
187                        Some(ClientEnd::<fidl_fuchsia_sysmem::BufferCollectionTokenMarker>::new(
188                            token.into_channel(),
189                        ));
190                    // FIDL failures will be caught via the request stream.
191                    if let Err(e) = processor.set_input_buffer_partial_settings(partial_settings) {
192                        warn!("Couldn't set input buffer settings: {:?}", e);
193                    }
194                };
195                self.input_allocation = maybe_done(SysmemAllocation::allocate(
196                    self.sysmem_client.clone(),
197                    BufferName { name: "StreamProcessorInput", priority: 1 },
198                    None,
199                    buffer_constraints,
200                    token_fn,
201                )?);
202            }
203            StreamProcessorEvent::OnOutputConstraints { output_config } => {
204                let output_constraints = ValidStreamOutputConstraints::try_from(output_config)?;
205                if !output_constraints.buffer_constraints_action_required {
206                    return Ok(());
207                }
208                let buffer_constraints =
209                    Self::buffer_constraints_from_min_size(MIN_OUTPUT_BUFFER_SIZE);
210                let processor = self.processor.clone();
211                let mut partial_settings = Self::partial_settings();
212                let token_fn = move |token: ClientEnd<BufferCollectionTokenMarker>| {
213                    // A sysmem token channel serves both sysmem(1) and sysmem2 token protocols, so
214                    // we can convert here until StreamProcessor has a sysmem2 token field.
215                    partial_settings.sysmem_token =
216                        Some(ClientEnd::<fidl_fuchsia_sysmem::BufferCollectionTokenMarker>::new(
217                            token.into_channel(),
218                        ));
219                    // FIDL failures will be caught via the request stream.
220                    if let Err(e) = processor.set_output_buffer_partial_settings(partial_settings) {
221                        warn!("Couldn't set output buffer settings: {:?}", e);
222                    }
223                };
224
225                self.output_allocation = maybe_done(SysmemAllocation::allocate(
226                    self.sysmem_client.clone(),
227                    BufferName { name: "StreamProcessorOutput", priority: 1 },
228                    None,
229                    buffer_constraints,
230                    token_fn,
231                )?);
232            }
233            StreamProcessorEvent::OnOutputPacket { output_packet, .. } => {
234                let mut lock = self.output_queue.lock();
235                lock.enqueue(output_packet);
236            }
237            StreamProcessorEvent::OnFreeInputPacket {
238                free_input_packet: PacketHeader { packet_index: Some(idx), .. },
239            } => {
240                if !self.client_owned.insert(InputBufferIndex(idx)) {
241                    warn!("Freed an input packet that was already freed: {:?}", idx);
242                }
243                self.setup_input_cursor();
244            }
245            StreamProcessorEvent::OnOutputEndOfStream { .. } => {
246                let mut lock = self.output_queue.lock();
247                lock.mark_ended();
248            }
249            StreamProcessorEvent::OnOutputFormat { .. } => {}
250            e => trace!("Unhandled stream processor event: {:?}", e),
251        }
252        Ok(())
253    }
254
255    /// Process one event, and return Poll::Ready if the item has been processed,
256    /// and Poll::Pending if no event has been processed and the waker will be woken if
257    /// another event happens.
258    fn process_event(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
259        match ready!(self.events.poll_next_unpin(cx)) {
260            Some(Err(e)) => Poll::Ready(Err(e.into())),
261            Some(Ok(event)) => Poll::Ready(self.handle_event(event)),
262            None => Poll::Ready(Err(format_err!("Client disconnected"))),
263        }
264    }
265
266    fn buffer_constraints_from_min_size(min_buffer_size: u32) -> BufferCollectionConstraints {
267        BufferCollectionConstraints {
268            buffer_memory_constraints: Some(BufferMemoryConstraints {
269                min_size_bytes: Some(min_buffer_size as u64),
270                ..Default::default()
271            }),
272            ..buffer_collection_constraints_default()
273        }
274    }
275
276    fn partial_settings() -> StreamBufferPartialSettings {
277        StreamBufferPartialSettings {
278            buffer_lifetime_ordinal: Some(1),
279            buffer_constraints_version_ordinal: Some(1),
280            sysmem_token: None,
281            ..Default::default()
282        }
283    }
284
285    fn input_buffers(&mut self) -> &mut SysmemAllocatedBuffers {
286        Pin::new(&mut self.input_allocation)
287            .output_mut()
288            .expect("allocation completed")
289            .as_mut()
290            .expect("succcessful allocation")
291    }
292
293    fn output_buffers(&mut self) -> &mut SysmemAllocatedBuffers {
294        Pin::new(&mut self.output_allocation)
295            .output_mut()
296            .expect("allocation completed")
297            .as_mut()
298            .expect("succcessful allocation")
299    }
300
301    /// Called when the input_allocation future finishes.
302    /// Takes the buffers out of the allocator, and sets up the input cursor to accept data.
303    fn input_allocation_complete(&mut self) -> Result<(), Error> {
304        let _ = Pin::new(&mut self.input_allocation)
305            .output_mut()
306            .ok_or_else(|| format_err!("allocation isn't complete"))?;
307
308        let settings = self.input_buffers().settings();
309        self.input_packet_size = (*settings.size_bytes.as_ref().unwrap()).try_into()?;
310        let buffer_count = self.input_buffers().len();
311        for i in 0..buffer_count {
312            let _ = self.client_owned.insert(InputBufferIndex(i.try_into()?));
313        }
314        // allocation is complete, and we can write to the input.
315        self.setup_input_cursor();
316        Ok(())
317    }
318
319    /// Called when the output allocation future finishes.
320    /// Takes the buffers out of the allocator, and sets up the output buffers for retrieval of output,
321    /// signaling to the processor that the output buffers are set.
322    fn output_allocation_complete(&mut self) -> Result<(), Error> {
323        let _ = Pin::new(&mut self.output_allocation)
324            .output_mut()
325            .ok_or_else(|| format_err!("allocation isn't complete"))?;
326        self.processor
327            .complete_output_buffer_partial_settings(/*buffer_lifetime_ordinal=*/ 1)
328            .context("setting output buffer settings")?;
329        Ok(())
330    }
331
332    /// Poll any of the allocations that are waiting to complete, returning Pending if
333    /// any are still waiting to finish, and Ready if one has failed or both have completed.
334    fn poll_buffer_allocation(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
335        if let MaybeDone::Future(_) = self.input_allocation {
336            match Pin::new(&mut self.input_allocation).poll(cx) {
337                Poll::Ready(()) => {
338                    if let Err(e) = self.input_allocation_complete() {
339                        return Poll::Ready(Err(e));
340                    }
341                }
342                Poll::Pending => {}
343            };
344        }
345        if let MaybeDone::Future(_) = self.output_allocation {
346            match Pin::new(&mut self.output_allocation).poll(cx) {
347                Poll::Ready(()) => {
348                    if let Err(e) = self.output_allocation_complete() {
349                        return Poll::Ready(Err(e));
350                    }
351                }
352                Poll::Pending => {}
353            };
354        }
355        Poll::Pending
356    }
357
358    /// Provides the current registered waiting context with priority given to the output waker.
359    fn waiting_waker(&self) -> Option<Waker> {
360        match (self.output_queue.lock().waker(), &self.input_waker) {
361            // No one is waiting.
362            (None, None) => None,
363            (Some(waker), _) => Some(waker.clone()),
364            (_, Some(waker)) => Some(waker.clone()),
365        }
366    }
367
368    /// Process all the events that are currently available from the StreamProcessor and Allocators,
369    /// waking any known waker to be woken when another event arrives.
370    /// Returns Ok(()) if this was accomplished or Err() if an error occurred while processing.
371    fn poll_events(&mut self) -> Result<(), Error> {
372        let waker = loop {
373            let waker = match self.waiting_waker() {
374                // No one still needs to be woken.  This means all the wakers have been awoke,
375                // and will repoll.
376                None => return Ok(()),
377                Some(waker) => waker,
378            };
379            match self.process_event(&mut Context::from_waker(&waker)) {
380                Poll::Pending => break waker,
381                Poll::Ready(Err(e)) => {
382                    warn!("Stream processing error: {:?}", e);
383                    return Err(e.into());
384                }
385                // Didn't set the waker to be awoken, so let's try again.
386                Poll::Ready(Ok(())) => {}
387            }
388        };
389
390        if let Poll::Ready(Err(e)) = self.poll_buffer_allocation(&mut Context::from_waker(&waker)) {
391            warn!("Stream buffer allocation error: {:?}", e);
392            return Err(e.into());
393        }
394        Ok(())
395    }
396
397    fn wake_output(&mut self) {
398        self.output_queue.lock().wake();
399    }
400
401    fn wake_input(&mut self) {
402        if let Some(w) = self.input_waker.take() {
403            w.wake();
404        }
405    }
406
407    /// Attempts to set up a new input cursor, out of the current set of client owned input buffers.
408    /// If the cursor is already set, this does nothing.
409    fn setup_input_cursor(&mut self) {
410        if self.input_cursor.is_some() {
411            // Nothing to be done
412            return;
413        }
414        let next_idx = match self.client_owned.iter().next() {
415            None => return,
416            Some(idx) => idx.clone(),
417        };
418        let _ = self.client_owned.remove(&next_idx);
419        self.input_cursor = Some((next_idx, 0));
420        self.wake_input();
421    }
422
423    /// Reads an output packet from the output buffers, and marks the packets as recycled so the
424    /// output buffer can be reused. Allocates a new vector to hold the data.
425    fn read_output_packet(&mut self, packet: Packet) -> Result<Vec<u8>, Error> {
426        let packet = ValidPacket::try_from(packet)?;
427
428        let output_size = packet.valid_length_bytes as usize;
429        let offset = packet.start_offset as u64;
430        let mut output = vec![0; output_size];
431        let buf_idx = packet.buffer_index;
432        let vmo = self.output_buffers().get_mut(buf_idx).expect("output vmo should exist");
433        vmo.read(&mut output, offset)?;
434        self.processor.recycle_output_packet(&packet.header.into())?;
435        Ok(output)
436    }
437}
438
439/// Struct representing a CodecFactory .
440/// Input sent to the encoder via `StreamProcessor::write_bytes` is queued for delivery, and delivered
441/// whenever a packet is full or `StreamProcessor::send_packet` is called.  Output can be retrieved using
442/// an `StreamProcessorStream` from `StreamProcessor::take_output_stream`.
443pub struct StreamProcessor {
444    inner: Arc<RwLock<StreamProcessorInner>>,
445}
446
447/// An StreamProcessorStream is a Stream of processed data from a stream processor.
448/// Returned from `StreamProcessor::take_output_stream`.
449pub struct StreamProcessorOutputStream {
450    inner: Arc<RwLock<StreamProcessorInner>>,
451}
452
453impl StreamProcessor {
454    /// Create a new StreamProcessor given the proxy.
455    /// Takes the event stream of the proxy.
456    fn create(processor: StreamProcessorProxy, sysmem_client: AllocatorProxy) -> Self {
457        let events = processor.take_event_stream();
458        Self {
459            inner: Arc::new(RwLock::new(StreamProcessorInner {
460                processor,
461                sysmem_client,
462                events,
463                input_packet_size: 0,
464                client_owned: HashSet::new(),
465                input_cursor: None,
466                output_queue: Default::default(),
467                input_waker: None,
468                input_allocation: maybe_done(SysmemAllocation::pending()),
469                output_allocation: maybe_done(SysmemAllocation::pending()),
470            })),
471        }
472    }
473
474    /// Create a new StreamProcessor encoder, with the given `input_domain` and `encoder_settings`.  See
475    /// stream_processor.fidl for descriptions of these parameters.  This is only meant for audio
476    /// encoding.
477    pub fn create_encoder(
478        input_domain: DomainFormat,
479        encoder_settings: EncoderSettings,
480    ) -> Result<StreamProcessor, Error> {
481        let sysmem_client = fuchsia_component::client::connect_to_protocol::<AllocatorMarker>()
482            .context("Connecting to sysmem")?;
483
484        let format_details = FormatDetails {
485            domain: Some(input_domain),
486            encoder_settings: Some(encoder_settings),
487            format_details_version_ordinal: Some(1),
488            mime_type: Some("audio/pcm".to_string()),
489            oob_bytes: None,
490            pass_through_parameters: None,
491            timebase: None,
492            ..Default::default()
493        };
494
495        let encoder_params = CreateEncoderParams {
496            input_details: Some(format_details),
497            require_hw: Some(false),
498            ..Default::default()
499        };
500
501        let codec_svc = fuchsia_component::client::connect_to_protocol::<CodecFactoryMarker>()
502            .context("Failed to connect to Codec Factory")?;
503
504        let (processor, stream_processor_serverend) = fidl::endpoints::create_proxy();
505
506        codec_svc.create_encoder(&encoder_params, stream_processor_serverend)?;
507
508        Ok(StreamProcessor::create(processor, sysmem_client))
509    }
510
511    /// Create a new StreamProcessor decoder, with the given `mime_type` and optional `oob_bytes`.  See
512    /// stream_processor.fidl for descriptions of these parameters.  This is only meant for audio
513    /// decoding.
514    pub fn create_decoder(
515        mime_type: &str,
516        oob_bytes: Option<Vec<u8>>,
517    ) -> Result<StreamProcessor, Error> {
518        let sysmem_client = fuchsia_component::client::connect_to_protocol::<AllocatorMarker>()
519            .context("Connecting to sysmem")?;
520
521        let format_details = FormatDetails {
522            mime_type: Some(mime_type.to_string()),
523            oob_bytes: oob_bytes,
524            format_details_version_ordinal: Some(1),
525            encoder_settings: None,
526            domain: None,
527            pass_through_parameters: None,
528            timebase: None,
529            ..Default::default()
530        };
531
532        let decoder_params = CreateDecoderParams {
533            input_details: Some(format_details),
534            permit_lack_of_split_header_handling: Some(true),
535            ..Default::default()
536        };
537
538        let codec_svc = fuchsia_component::client::connect_to_protocol::<CodecFactoryMarker>()
539            .context("Failed to connect to Codec Factory")?;
540
541        let (processor, stream_processor_serverend) = fidl::endpoints::create_proxy();
542
543        codec_svc.create_decoder(&decoder_params, stream_processor_serverend)?;
544
545        Ok(StreamProcessor::create(processor, sysmem_client))
546    }
547
548    /// Take a stream object which will produce the output of the processor.
549    /// Only one StreamProcessorOutputStream object can exist at a time, and this will return an Error if it is
550    /// already taken.
551    pub fn take_output_stream(&mut self) -> Result<StreamProcessorOutputStream, Error> {
552        {
553            let read = self.inner.read();
554            let mut lock = read.output_queue.lock();
555            if let Listener::None = lock.listener {
556                lock.listener = Listener::New;
557            } else {
558                return Err(format_err!("Output stream already taken"));
559            }
560        }
561        Ok(StreamProcessorOutputStream { inner: self.inner.clone() })
562    }
563
564    /// Deliver input to the stream processor.  Returns the number of bytes delivered.
565    fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize, io::Error> {
566        let mut bytes_idx = 0;
567        while bytes.len() > bytes_idx {
568            {
569                let mut write = self.inner.write();
570                let (idx, size) = match write.input_cursor.take() {
571                    None => return Ok(bytes_idx),
572                    Some(x) => x,
573                };
574                let space_left = write.input_packet_size - size;
575                let left_to_write = bytes.len() - bytes_idx;
576                let buffer_vmo = write.input_buffers().get_mut(idx.0).expect("need buffer vmo");
577                if space_left as usize > left_to_write {
578                    let write_buf = &bytes[bytes_idx..];
579                    let write_len = write_buf.len();
580                    buffer_vmo.write(write_buf, size)?;
581                    bytes_idx += write_len;
582                    write.input_cursor = Some((idx, size + write_len as u64));
583                    assert!(bytes.len() == bytes_idx);
584                    return Ok(bytes_idx);
585                }
586                let end_idx = bytes_idx + space_left as usize;
587                let write_buf = &bytes[bytes_idx..end_idx];
588                let write_len = write_buf.len();
589                buffer_vmo.write(write_buf, size)?;
590                bytes_idx += write_len;
591                // this buffer is done, ship it!
592                assert_eq!(size + write_len as u64, write.input_packet_size);
593                write.input_cursor = Some((idx, write.input_packet_size));
594            }
595            self.send_packet()?;
596        }
597        Ok(bytes_idx)
598    }
599
600    /// Flush the input buffer to the processor, relinquishing the ownership of the buffer
601    /// currently in the input cursor, and picking a new input buffer.  If there is no input
602    /// buffer left, the input cursor is left as None.
603    pub fn send_packet(&mut self) -> Result<(), io::Error> {
604        let mut write = self.inner.write();
605        if write.input_cursor.is_none() {
606            // Nothing to flush, nothing can have been written to an empty input cursor.
607            return Ok(());
608        }
609        let (idx, size) = write.input_cursor.take().expect("input cursor is none");
610        if size == 0 {
611            // Can't send empty packet to processor.
612            write.input_cursor = Some((idx, size));
613            return Ok(());
614        }
615        let packet = Packet {
616            header: Some(PacketHeader {
617                buffer_lifetime_ordinal: Some(1),
618                packet_index: Some(idx.0),
619                ..Default::default()
620            }),
621            buffer_index: Some(idx.0),
622            stream_lifetime_ordinal: Some(1),
623            start_offset: Some(0),
624            valid_length_bytes: Some(size as u32),
625            start_access_unit: Some(true),
626            known_end_access_unit: Some(true),
627            ..Default::default()
628        };
629        write.processor.queue_input_packet(&packet).map_err(fidl_error_to_io_error)?;
630        // pick another buffer for the input cursor
631        write.setup_input_cursor();
632        Ok(())
633    }
634
635    /// Test whether it is possible to write to the StreamProcessor. If there are no input buffers
636    /// available, returns Poll::Pending and arranges for the input task to receive a
637    /// notification when an input buffer may be available or the encoder is closed.
638    fn poll_writable(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
639        let mut write = self.inner.write();
640        // Drop the current input waker, since we have a new one.
641        // If the output waker is set, it should already be queued to be woken for the codec.
642        write.input_waker = None;
643        if write.input_cursor.is_some() {
644            return Poll::Ready(Ok(()));
645        }
646        write.input_waker = Some(cx.waker().clone());
647        // This can:
648        //  - wake the input waker (somehow received a input packet)
649        //  - poll with the output waker, setting it up to be woken
650        //  - poll with the input waker to be woken
651        if let Err(e) = write.poll_events() {
652            return Poll::Ready(Err(io::Error::other(e)));
653        }
654        Poll::Pending
655    }
656
657    pub fn close(&mut self) -> Result<(), io::Error> {
658        self.send_packet()?;
659
660        let mut write = self.inner.write();
661
662        write.processor.queue_input_end_of_stream(1).map_err(fidl_error_to_io_error)?;
663        // TODO: indicate this another way so that we can send an error if someone tries to write
664        // it after it's closed.
665        write.input_cursor = None;
666        write.wake_input();
667        write.wake_output();
668        Ok(())
669    }
670}
671
672impl AsyncWrite for StreamProcessor {
673    fn poll_write(
674        mut self: Pin<&mut Self>,
675        cx: &mut Context<'_>,
676        buf: &[u8],
677    ) -> Poll<io::Result<usize>> {
678        ready!(self.poll_writable(cx))?;
679        match self.write_bytes(buf) {
680            Ok(written) => Poll::Ready(Ok(written)),
681            Err(e) => Poll::Ready(Err(e.into())),
682        }
683    }
684
685    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
686        Poll::Ready(self.send_packet())
687    }
688
689    fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
690        Poll::Ready(self.send_packet())
691    }
692}
693
694impl Stream for StreamProcessorOutputStream {
695    type Item = Result<Vec<u8>, Error>;
696
697    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
698        let mut write = self.inner.write();
699        // If we have a item ready, just return it.
700        let packet = {
701            let mut queue = write.output_queue.lock();
702            match queue.poll_next_unpin(cx) {
703                Poll::Ready(Some(packet)) => Some(Some(packet)),
704                Poll::Ready(None) => Some(None),
705                Poll::Pending => {
706                    // The waker has been set for when the queue gets data.
707                    // We also need to set the same waker if an event happens.
708                    None
709                }
710            }
711        };
712        // We always need to set a waker for the events loop (this may be the same waker as above,
713        // or the input waker if the stream returned a packet)
714        if let Err(e) = write.poll_events() {
715            return Poll::Ready(Some(Err(e.into())));
716        }
717        match packet {
718            Some(Some(packet)) => Poll::Ready(Some(write.read_output_packet(packet))),
719            Some(None) => Poll::Ready(None),
720            None => Poll::Pending,
721        }
722    }
723}
724
725impl FusedStream for StreamProcessorOutputStream {
726    fn is_terminated(&self) -> bool {
727        self.inner.read().output_queue.lock().ended
728    }
729}
730
731#[cfg(test)]
732mod tests {
733    use super::*;
734
735    use async_test_helpers::run_while;
736    use byteorder::{ByteOrder, NativeEndian};
737    use fixture::fixture;
738    use fuchsia_async as fasync;
739    use futures::FutureExt;
740    use futures::io::AsyncWriteExt;
741    use futures_test::task::new_count_waker;
742    use sha2::{Digest as _, Sha256};
743    use std::fs::File;
744    use std::io::{Read, Write};
745    use std::pin::pin;
746
747    use stream_processor_test::ExpectedDigest;
748
749    const PCM_SAMPLE_SIZE: usize = 2;
750
751    #[derive(Clone, Debug)]
752    pub struct PcmAudio {
753        pcm_format: PcmFormat,
754        buffer: Vec<u8>,
755    }
756
757    impl PcmAudio {
758        pub fn create_saw_wave(pcm_format: PcmFormat, frame_count: usize) -> Self {
759            const FREQUENCY: f32 = 20.0;
760            const AMPLITUDE: f32 = 0.2;
761
762            let pcm_frame_size = PCM_SAMPLE_SIZE * pcm_format.channel_map.len();
763            let samples_per_frame = pcm_format.channel_map.len();
764            let sample_count = frame_count * samples_per_frame;
765
766            let mut buffer = vec![0; frame_count * pcm_frame_size];
767
768            for i in 0..sample_count {
769                let frame = (i / samples_per_frame) as f32;
770                let value =
771                    ((frame * FREQUENCY / (pcm_format.frames_per_second as f32)) % 1.0) * AMPLITUDE;
772                let sample = (value * i16::max_value() as f32) as i16;
773
774                let mut sample_bytes = [0; std::mem::size_of::<i16>()];
775                NativeEndian::write_i16(&mut sample_bytes, sample);
776
777                let offset = i * PCM_SAMPLE_SIZE;
778                buffer[offset] = sample_bytes[0];
779                buffer[offset + 1] = sample_bytes[1];
780            }
781
782            Self { pcm_format, buffer }
783        }
784
785        pub fn frame_size(&self) -> usize {
786            self.pcm_format.channel_map.len() * PCM_SAMPLE_SIZE
787        }
788    }
789
790    // Note: stolen from audio_encoder_test, update to stream_processor_test lib when this gets
791    // moved.
792    pub struct BytesValidator {
793        pub output_file: Option<&'static str>,
794        pub expected_digest: ExpectedDigest,
795    }
796
797    impl BytesValidator {
798        fn write_and_hash(&self, mut file: impl Write, bytes: &[u8]) -> Result<(), Error> {
799            let mut hasher = Sha256::default();
800
801            file.write_all(&bytes)?;
802            hasher.update(&bytes);
803
804            let digest: [u8; 32] = hasher.finalize().into();
805            if self.expected_digest.bytes != digest {
806                return Err(format_err!(
807                    "Expected {}; got {}",
808                    self.expected_digest,
809                    hex::encode(digest)
810                ))
811                .into();
812            }
813
814            Ok(())
815        }
816
817        fn output_file(&self) -> Result<impl Write, Error> {
818            Ok(if let Some(file) = self.output_file {
819                Box::new(std::fs::File::create(file)?) as Box<dyn Write>
820            } else {
821                Box::new(std::io::sink()) as Box<dyn Write>
822            })
823        }
824
825        fn validate(&self, bytes: &[u8]) -> Result<(), Error> {
826            self.write_and_hash(self.output_file()?, &bytes)
827        }
828    }
829
830    #[fuchsia::test]
831    fn encode_sbc() {
832        let mut exec = fasync::TestExecutor::new();
833
834        let pcm_format = PcmFormat {
835            pcm_mode: AudioPcmMode::Linear,
836            bits_per_sample: 16,
837            frames_per_second: 44100,
838            channel_map: vec![AudioChannelId::Cf],
839        };
840
841        let sub_bands = SbcSubBands::SubBands4;
842        let block_count = SbcBlockCount::BlockCount8;
843
844        let input_frames = 3000;
845
846        let pcm_audio = PcmAudio::create_saw_wave(pcm_format.clone(), input_frames);
847
848        let sbc_encoder_settings = EncoderSettings::Sbc(SbcEncoderSettings {
849            sub_bands,
850            block_count,
851            allocation: SbcAllocation::AllocLoudness,
852            channel_mode: SbcChannelMode::Mono,
853            bit_pool: 59, // Recommended from the SBC spec for these parameters.
854        });
855
856        let input_domain = DomainFormat::Audio(AudioFormat::Uncompressed(
857            AudioUncompressedFormat::Pcm(pcm_format),
858        ));
859
860        let mut encoder = StreamProcessor::create_encoder(input_domain, sbc_encoder_settings)
861            .expect("to create Encoder");
862
863        let frames_per_packet: usize = 8; // Randomly chosen by fair d10 roll.
864        let packet_size = pcm_audio.frame_size() * frames_per_packet;
865        let mut packets = pcm_audio.buffer.as_slice().chunks(packet_size);
866        let first_packet = packets.next().unwrap();
867
868        // Write an initial frame to the encoder.
869        // This is required to get past allocating the input/output buffers.
870        let written =
871            exec.run_singlethreaded(&mut encoder.write(first_packet)).expect("successful write");
872        assert_eq!(written, first_packet.len());
873
874        let mut encoded_stream = encoder.take_output_stream().expect("Stream should be taken");
875
876        // Shouldn't be able to take the stream twice
877        assert!(encoder.take_output_stream().is_err());
878
879        // Polling the encoded stream before the encoder has started up should wake it when
880        // output starts happening, set up the poll here.
881        let encoded_fut = pin!(encoded_stream.next());
882
883        let (waker, encoder_fut_wake_count) = new_count_waker();
884        let mut counting_ctx = Context::from_waker(&waker);
885
886        assert!(encoded_fut.poll(&mut counting_ctx).is_pending());
887
888        let mut frames_sent = first_packet.len() / pcm_audio.frame_size();
889
890        for packet in packets {
891            let mut written_fut = encoder.write(&packet);
892
893            let written_bytes =
894                exec.run_singlethreaded(&mut written_fut).expect("to write to encoder");
895
896            assert_eq!(packet.len(), written_bytes);
897            frames_sent += packet.len() / pcm_audio.frame_size();
898        }
899
900        encoder.close().expect("stream should always be closable");
901
902        assert_eq!(input_frames, frames_sent);
903
904        // When an unprocessed event has happened on the stream, even if intervening events have been
905        // procesed by the input processes, it should wake the output future to process the events.
906        let woke_count = encoder_fut_wake_count.get();
907        while encoder_fut_wake_count.get() == woke_count {
908            let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
909        }
910        assert_eq!(encoder_fut_wake_count.get(), woke_count + 1);
911
912        // Get data from the output now.
913        let mut encoded = Vec::new();
914
915        loop {
916            let mut encoded_fut = encoded_stream.next();
917
918            match exec.run_singlethreaded(&mut encoded_fut) {
919                Some(Ok(enc_data)) => {
920                    assert!(!enc_data.is_empty());
921                    encoded.extend_from_slice(&enc_data);
922                }
923                Some(Err(e)) => {
924                    panic!("Unexpected error when polling encoded data: {}", e);
925                }
926                None => {
927                    break;
928                }
929            }
930        }
931
932        // Match the encoded data to the known hash.
933        let expected_digest = ExpectedDigest::new(
934            "Sbc: 44.1kHz/Loudness/Mono/bitpool 56/blocks 8/subbands 4",
935            "5c65a88bda3f132538966d87df34aa8675f85c9892b7f9f5571f76f3c7813562",
936        );
937        let hash_validator = BytesValidator { output_file: None, expected_digest };
938
939        assert_eq!(6110, encoded.len(), "Encoded size should be equal");
940
941        let validated = hash_validator.validate(encoded.as_slice());
942        assert!(validated.is_ok(), "Failed hash: {:?}", validated);
943    }
944
945    fn fix_sbc_test_file<F>(_name: &str, test: F)
946    where
947        F: FnOnce(Vec<u8>) -> (),
948    {
949        const SBC_TEST_FILE: &str = "/pkg/data/s16le44100mono.sbc";
950
951        let mut sbc_data = Vec::new();
952        let _ = File::open(SBC_TEST_FILE)
953            .expect("open test file")
954            .read_to_end(&mut sbc_data)
955            .expect("read test file");
956
957        test(sbc_data)
958    }
959
960    #[fixture(fix_sbc_test_file)]
961    #[fuchsia::test]
962    fn decode_sbc(sbc_data: Vec<u8>) {
963        let mut exec = fasync::TestExecutor::new();
964
965        const SBC_FRAME_SIZE: usize = 72;
966        const INPUT_FRAMES: usize = 23;
967
968        // SBC codec info corresponding to Mono reference stream.
969        let oob_data = Some([0x82, 0x00, 0x00, 0x00].to_vec());
970        let mut decoder =
971            StreamProcessor::create_decoder("audio/sbc", oob_data).expect("to create decoder");
972
973        let mut decoded_stream = decoder.take_output_stream().expect("Stream should be taken");
974
975        // Shouldn't be able to take the stream twice
976        assert!(decoder.take_output_stream().is_err());
977
978        let mut frames_sent = 0;
979
980        let frames_per_packet: usize = 1; // Randomly chosen by fair d10 roll.
981        let packet_size = SBC_FRAME_SIZE * frames_per_packet;
982
983        for frames in sbc_data.as_slice().chunks(packet_size) {
984            let mut written_fut = decoder.write(&frames);
985
986            let written_bytes =
987                exec.run_singlethreaded(&mut written_fut).expect("to write to decoder");
988
989            assert_eq!(frames.len(), written_bytes);
990            frames_sent += frames.len() / SBC_FRAME_SIZE;
991        }
992
993        assert_eq!(INPUT_FRAMES, frames_sent);
994
995        let mut flush_fut = pin!(decoder.flush());
996        exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
997
998        decoder.close().expect("stream should always be closable");
999
1000        // Get data from the output now.
1001        let mut decoded = Vec::new();
1002
1003        loop {
1004            let mut decoded_fut = decoded_stream.next();
1005
1006            match exec.run_singlethreaded(&mut decoded_fut) {
1007                Some(Ok(dec_data)) => {
1008                    assert!(!dec_data.is_empty());
1009                    decoded.extend_from_slice(&dec_data);
1010                }
1011                Some(Err(e)) => {
1012                    panic!("Unexpected error when polling decoded data: {}", e);
1013                }
1014                None => {
1015                    break;
1016                }
1017            }
1018        }
1019
1020        // Match the decoded data to the known hash.
1021        let expected_digest = ExpectedDigest::new(
1022            "Pcm: 44.1kHz/16bit/Mono",
1023            "ff2e7afea51217886d3df15b9a623b4e49c9bd9bd79c58ac01bc94c5511e08d6",
1024        );
1025        let hash_validator = BytesValidator { output_file: None, expected_digest };
1026
1027        assert_eq!(256 * INPUT_FRAMES, decoded.len(), "Decoded size should be equal");
1028
1029        let validated = hash_validator.validate(decoded.as_slice());
1030        assert!(validated.is_ok(), "Failed hash: {:?}", validated);
1031    }
1032
1033    #[fixture(fix_sbc_test_file)]
1034    #[fuchsia::test]
1035    fn decode_sbc_wakes_output_to_process_events(sbc_data: Vec<u8>) {
1036        let mut exec = fasync::TestExecutor::new();
1037        const SBC_FRAME_SIZE: usize = 72;
1038
1039        // SBC codec info corresponding to Mono reference stream.
1040        let oob_data = Some([0x82, 0x00, 0x00, 0x00].to_vec());
1041        let mut decoder =
1042            StreamProcessor::create_decoder("audio/sbc", oob_data).expect("to create decoder");
1043
1044        let mut chunks = sbc_data.as_slice().chunks(SBC_FRAME_SIZE);
1045        let next_frame = chunks.next().unwrap();
1046
1047        // Write an initial frame to the encoder.
1048        // This is required to get past allocating the input/output buffers.
1049        let written =
1050            exec.run_singlethreaded(&mut decoder.write(next_frame)).expect("successful write");
1051        assert_eq!(written, next_frame.len());
1052
1053        let mut decoded_stream = decoder.take_output_stream().expect("Stream should be taken");
1054
1055        // Polling the decoded stream before the decoder has started up should wake it when
1056        // output starts happening, set up the poll here.
1057        let decoded_fut = pin!(decoded_stream.next());
1058
1059        let (waker, decoder_fut_wake_count) = new_count_waker();
1060        let mut counting_ctx = Context::from_waker(&waker);
1061
1062        assert!(decoded_fut.poll(&mut counting_ctx).is_pending());
1063
1064        // Send only one frame. This is not eneough to automatically cause output to be generated
1065        // by pushing data.
1066        let frame = chunks.next().unwrap();
1067        let mut written_fut = decoder.write(&frame);
1068        let written_bytes = exec.run_singlethreaded(&mut written_fut).expect("to write to decoder");
1069        assert_eq!(frame.len(), written_bytes);
1070
1071        let mut flush_fut = pin!(decoder.flush());
1072        exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
1073
1074        // When an unprocessed event has happened on the stream, even if intervening events have been
1075        // procesed by the input processes, it should wake the output future to process the events.
1076        assert_eq!(decoder_fut_wake_count.get(), 0);
1077        while decoder_fut_wake_count.get() == 0 {
1078            let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
1079        }
1080        assert_eq!(decoder_fut_wake_count.get(), 1);
1081
1082        let mut decoded = Vec::new();
1083        // Drops the previous decoder future, which is fine.
1084        let mut decoded_fut = decoded_stream.next();
1085
1086        match exec.run_singlethreaded(&mut decoded_fut) {
1087            Some(Ok(dec_data)) => {
1088                assert!(!dec_data.is_empty());
1089                decoded.extend_from_slice(&dec_data);
1090            }
1091            x => panic!("Expected decoded frame, got {:?}", x),
1092        }
1093
1094        assert_eq!(512, decoded.len(), "Decoded size should be equal to one frame");
1095    }
1096
1097    #[fixture(fix_sbc_test_file)]
1098    #[fuchsia::test]
1099    fn decode_sbc_wakes_input_to_process_events(sbc_data: Vec<u8>) {
1100        let mut exec = fasync::TestExecutor::new();
1101        const SBC_FRAME_SIZE: usize = 72;
1102
1103        // SBC codec info corresponding to Mono reference stream.
1104        let oob_data = Some([0x82, 0x00, 0x00, 0x00].to_vec());
1105        let mut decoder =
1106            StreamProcessor::create_decoder("audio/sbc", oob_data).expect("to create decoder");
1107
1108        let mut decoded_stream = decoder.take_output_stream().expect("Stream should be taken");
1109
1110        let decoded_fut = pin!(decoded_stream.next());
1111
1112        let mut chunks = sbc_data.as_slice().chunks(SBC_FRAME_SIZE);
1113        let next_frame = chunks.next().unwrap();
1114
1115        // Write an initial frame to the encoder.
1116        // This is to get past allocating the input/output buffers stage.
1117        // TODO(https://fxbug.dev/42081385): Both futures need to be polled here even though it's only the
1118        // writer we really care about because currently decoded_fut is needed to drive the
1119        // allocation process.
1120        let (written_res, mut decoded_fut) =
1121            run_while(&mut exec, decoded_fut, decoder.write(next_frame));
1122        assert_eq!(written_res.expect("initial write should succeed"), next_frame.len());
1123
1124        // Write to the encoder until we cannot write anymore, because there are no input buffers
1125        // available.  This should happen when all the input buffers are full and and the input
1126        // buffers are waiting to be written.
1127        let (waker, write_fut_wake_count) = new_count_waker();
1128        let mut counting_ctx = Context::from_waker(&waker);
1129
1130        let mut wake_count_before_stall = 0;
1131        for frame in chunks {
1132            wake_count_before_stall = write_fut_wake_count.get();
1133            let mut written_fut = decoder.write(&frame);
1134            if written_fut.poll_unpin(&mut counting_ctx).is_pending() {
1135                // The poll_unpin can wake the input waker if an event arrived for it, meaning we should
1136                // continue filling.
1137                if write_fut_wake_count.get() != wake_count_before_stall {
1138                    continue;
1139                }
1140                // We should have never been woken until now, because we always were ready before,
1141                // and the output waker is not registered (so can't progress)
1142                break;
1143            }
1144            // Flush the packet, to make input buffers get spent faster.
1145            let mut flush_fut = pin!(decoder.flush());
1146            exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
1147        }
1148
1149        // We should be able to get a decoded output, once the codec does it's thing.
1150        let decoded_frame = exec.run_singlethreaded(&mut decoded_fut);
1151        assert_eq!(512, decoded_frame.unwrap().unwrap().len(), "Decoded frame size wrong");
1152
1153        // Fill the input buffer again so the input waker is registered.
1154        let chunks = sbc_data.as_slice().chunks(SBC_FRAME_SIZE);
1155        for frame in chunks {
1156            wake_count_before_stall = write_fut_wake_count.get();
1157            let mut written_fut = decoder.write(&frame);
1158            if written_fut.poll_unpin(&mut counting_ctx).is_pending() {
1159                // The poll_unpin can wake the input waker if an event arrived for it, meaning we should
1160                // continue filling.
1161                if write_fut_wake_count.get() != wake_count_before_stall {
1162                    continue;
1163                }
1164                break;
1165            }
1166            // Flush the packet, to make input buffers get spent faster.
1167            let mut flush_fut = pin!(decoder.flush());
1168            exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
1169        }
1170
1171        // The input waker should be the one waiting on events from the codec and get woken up,
1172        // even if an output event happens.
1173        // At some point, we will get an event from the encoder, with no output waker set, and this
1174        // should wake the input waker, which is waiting to be woken up.
1175        while write_fut_wake_count.get() == wake_count_before_stall {
1176            let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
1177        }
1178
1179        // Note: at this point, we may not be able to write another frame, but the waiter should
1180        // repoll, and set the waker again.
1181    }
1182}