1use anyhow::{Context as _, Error, format_err};
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_media::*;
8use fidl_fuchsia_mediacodec::*;
9use fidl_fuchsia_sysmem2::*;
10use fuchsia_stream_processors::*;
11use fuchsia_sync::{Mutex, RwLock};
12use futures::future::{MaybeDone, maybe_done};
13use futures::io::{self, AsyncWrite};
14use futures::stream::{FusedStream, Stream};
15use futures::task::{Context, Poll, Waker};
16use futures::{Future, StreamExt, ready};
17use log::{trace, warn};
18use std::collections::{HashSet, VecDeque};
19use std::mem;
20use std::pin::Pin;
21use std::sync::Arc;
22use zx::StatusExt;
23
24use crate::buffer_collection_constraints::buffer_collection_constraints_default;
25use crate::sysmem_allocator::{BufferName, SysmemAllocatedBuffers, SysmemAllocation};
26
27fn fidl_error_to_io_error(e: fidl::Error) -> io::Error {
28 io::Error::other(format_err!("Fidl Error: {}", e))
29}
30
31#[derive(Debug)]
32enum Listener {
35 None,
37 New,
39 Some(Waker),
41}
42
43impl Listener {
44 fn register(&mut self, waker: Waker) {
47 *self = match mem::replace(self, Listener::None) {
48 Listener::None => panic!("Polled a listener with no pollers"),
49 _ => Listener::Some(waker),
50 };
51 }
52
53 fn wake(&mut self) {
56 if let Listener::None = self {
57 return;
58 }
59 match mem::replace(self, Listener::New) {
60 Listener::None => panic!("Should have been polled"),
61 Listener::Some(waker) => waker.wake(),
62 Listener::New => {}
63 }
64 }
65
66 fn waker(&self) -> Option<&Waker> {
68 if let Listener::Some(waker) = self { Some(waker) } else { None }
69 }
70}
71
72impl Default for Listener {
73 fn default() -> Self {
74 Listener::None
75 }
76}
77
78struct OutputQueue {
80 listener: Listener,
82 queue: VecDeque<Packet>,
84 ended: bool,
87}
88
89impl OutputQueue {
90 fn enqueue(&mut self, packet: Packet) {
92 self.queue.push_back(packet);
93 self.listener.wake();
94 }
95
96 fn mark_ended(&mut self) {
99 self.ended = true;
100 self.listener.wake();
101 }
102
103 fn waker(&self) -> Option<&Waker> {
104 self.listener.waker()
105 }
106
107 fn wake(&mut self) {
109 self.listener.wake();
110 }
111}
112
113impl Default for OutputQueue {
114 fn default() -> Self {
115 OutputQueue { listener: Listener::default(), queue: VecDeque::new(), ended: false }
116 }
117}
118
119impl Stream for OutputQueue {
120 type Item = Packet;
121
122 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
123 match self.queue.pop_front() {
124 Some(packet) => Poll::Ready(Some(packet)),
125 None if self.ended => Poll::Ready(None),
126 None => {
127 self.listener.register(cx.waker().clone());
128 Poll::Pending
129 }
130 }
131 }
132}
133
134const MIN_INPUT_BUFFER_SIZE: u32 = 4096;
137const MIN_OUTPUT_BUFFER_SIZE: u32 = 0;
139
140#[derive(PartialEq, Eq, Hash, Clone, Debug)]
142struct InputBufferIndex(u32);
143
144struct StreamProcessorInner {
147 processor: StreamProcessorProxy,
149 sysmem_client: AllocatorProxy,
151 events: StreamProcessorEventStream,
153 input_packet_size: u64,
155 client_owned: HashSet<InputBufferIndex>,
158 input_cursor: Option<(InputBufferIndex, u64)>,
160 output_queue: Mutex<OutputQueue>,
164 input_waker: Option<Waker>,
166 input_allocation: MaybeDone<SysmemAllocation>,
168 output_allocation: MaybeDone<SysmemAllocation>,
170}
171
172impl StreamProcessorInner {
173 fn handle_event(&mut self, evt: StreamProcessorEvent) -> Result<(), Error> {
177 match evt {
178 StreamProcessorEvent::OnInputConstraints { input_constraints } => {
179 let _input_constraints = ValidStreamBufferConstraints::try_from(input_constraints)?;
180 let buffer_constraints =
181 Self::buffer_constraints_from_min_size(MIN_INPUT_BUFFER_SIZE);
182 let processor = self.processor.clone();
183 let mut partial_settings = Self::partial_settings();
184 let token_fn = move |token: ClientEnd<BufferCollectionTokenMarker>| {
185 partial_settings.sysmem_token =
188 Some(ClientEnd::<fidl_fuchsia_sysmem::BufferCollectionTokenMarker>::new(
189 token.into_channel(),
190 ));
191 if let Err(e) = processor.set_input_buffer_partial_settings(partial_settings) {
193 warn!("Couldn't set input buffer settings: {:?}", e);
194 }
195 };
196 self.input_allocation = maybe_done(SysmemAllocation::allocate(
197 self.sysmem_client.clone(),
198 BufferName { name: "StreamProcessorInput", priority: 1 },
199 None,
200 buffer_constraints,
201 token_fn,
202 )?);
203 }
204 StreamProcessorEvent::OnOutputConstraints { output_config } => {
205 let output_constraints = ValidStreamOutputConstraints::try_from(output_config)?;
206 if !output_constraints.buffer_constraints_action_required {
207 return Ok(());
208 }
209 let buffer_constraints =
210 Self::buffer_constraints_from_min_size(MIN_OUTPUT_BUFFER_SIZE);
211 let processor = self.processor.clone();
212 let mut partial_settings = Self::partial_settings();
213 let token_fn = move |token: ClientEnd<BufferCollectionTokenMarker>| {
214 partial_settings.sysmem_token =
217 Some(ClientEnd::<fidl_fuchsia_sysmem::BufferCollectionTokenMarker>::new(
218 token.into_channel(),
219 ));
220 if let Err(e) = processor.set_output_buffer_partial_settings(partial_settings) {
222 warn!("Couldn't set output buffer settings: {:?}", e);
223 }
224 };
225
226 self.output_allocation = maybe_done(SysmemAllocation::allocate(
227 self.sysmem_client.clone(),
228 BufferName { name: "StreamProcessorOutput", priority: 1 },
229 None,
230 buffer_constraints,
231 token_fn,
232 )?);
233 }
234 StreamProcessorEvent::OnOutputPacket { output_packet, .. } => {
235 let mut lock = self.output_queue.lock();
236 lock.enqueue(output_packet);
237 }
238 StreamProcessorEvent::OnFreeInputPacket {
239 free_input_packet: PacketHeader { packet_index: Some(idx), .. },
240 } => {
241 if !self.client_owned.insert(InputBufferIndex(idx)) {
242 warn!("Freed an input packet that was already freed: {:?}", idx);
243 }
244 self.setup_input_cursor();
245 }
246 StreamProcessorEvent::OnOutputEndOfStream { .. } => {
247 let mut lock = self.output_queue.lock();
248 lock.mark_ended();
249 }
250 StreamProcessorEvent::OnOutputFormat { .. } => {}
251 e => trace!("Unhandled stream processor event: {:?}", e),
252 }
253 Ok(())
254 }
255
256 fn process_event(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
260 match ready!(self.events.poll_next_unpin(cx)) {
261 Some(Err(e)) => Poll::Ready(Err(e.into())),
262 Some(Ok(event)) => Poll::Ready(self.handle_event(event)),
263 None => Poll::Ready(Err(format_err!("Client disconnected"))),
264 }
265 }
266
267 fn buffer_constraints_from_min_size(min_buffer_size: u32) -> BufferCollectionConstraints {
268 BufferCollectionConstraints {
269 buffer_memory_constraints: Some(BufferMemoryConstraints {
270 min_size_bytes: Some(min_buffer_size as u64),
271 ..Default::default()
272 }),
273 ..buffer_collection_constraints_default()
274 }
275 }
276
277 fn partial_settings() -> StreamBufferPartialSettings {
278 StreamBufferPartialSettings {
279 buffer_lifetime_ordinal: Some(1),
280 buffer_constraints_version_ordinal: Some(1),
281 sysmem_token: None,
282 ..Default::default()
283 }
284 }
285
286 fn input_buffers(&mut self) -> &mut SysmemAllocatedBuffers {
287 Pin::new(&mut self.input_allocation)
288 .output_mut()
289 .expect("allocation completed")
290 .as_mut()
291 .expect("succcessful allocation")
292 }
293
294 fn output_buffers(&mut self) -> &mut SysmemAllocatedBuffers {
295 Pin::new(&mut self.output_allocation)
296 .output_mut()
297 .expect("allocation completed")
298 .as_mut()
299 .expect("succcessful allocation")
300 }
301
302 fn input_allocation_complete(&mut self) -> Result<(), Error> {
305 let _ = Pin::new(&mut self.input_allocation)
306 .output_mut()
307 .ok_or_else(|| format_err!("allocation isn't complete"))?;
308
309 let settings = self.input_buffers().settings();
310 self.input_packet_size = (*settings.size_bytes.as_ref().unwrap()).try_into()?;
311 let buffer_count = self.input_buffers().len();
312 for i in 0..buffer_count {
313 let _ = self.client_owned.insert(InputBufferIndex(i.try_into()?));
314 }
315 self.setup_input_cursor();
317 Ok(())
318 }
319
320 fn output_allocation_complete(&mut self) -> Result<(), Error> {
324 let _ = Pin::new(&mut self.output_allocation)
325 .output_mut()
326 .ok_or_else(|| format_err!("allocation isn't complete"))?;
327 self.processor
328 .complete_output_buffer_partial_settings(1)
329 .context("setting output buffer settings")?;
330 Ok(())
331 }
332
333 fn poll_buffer_allocation(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
336 if let MaybeDone::Future(_) = self.input_allocation {
337 match Pin::new(&mut self.input_allocation).poll(cx) {
338 Poll::Ready(()) => {
339 if let Err(e) = self.input_allocation_complete() {
340 return Poll::Ready(Err(e));
341 }
342 }
343 Poll::Pending => {}
344 };
345 }
346 if let MaybeDone::Future(_) = self.output_allocation {
347 match Pin::new(&mut self.output_allocation).poll(cx) {
348 Poll::Ready(()) => {
349 if let Err(e) = self.output_allocation_complete() {
350 return Poll::Ready(Err(e));
351 }
352 }
353 Poll::Pending => {}
354 };
355 }
356 Poll::Pending
357 }
358
359 fn waiting_waker(&self) -> Option<Waker> {
361 match (self.output_queue.lock().waker(), &self.input_waker) {
362 (None, None) => None,
364 (Some(waker), _) => Some(waker.clone()),
365 (_, Some(waker)) => Some(waker.clone()),
366 }
367 }
368
369 fn poll_events(&mut self) -> Result<(), Error> {
373 let waker = loop {
374 let waker = match self.waiting_waker() {
375 None => return Ok(()),
378 Some(waker) => waker,
379 };
380 match self.process_event(&mut Context::from_waker(&waker)) {
381 Poll::Pending => break waker,
382 Poll::Ready(Err(e)) => {
383 warn!("Stream processing error: {:?}", e);
384 return Err(e.into());
385 }
386 Poll::Ready(Ok(())) => {}
388 }
389 };
390
391 if let Poll::Ready(Err(e)) = self.poll_buffer_allocation(&mut Context::from_waker(&waker)) {
392 warn!("Stream buffer allocation error: {:?}", e);
393 return Err(e.into());
394 }
395 Ok(())
396 }
397
398 fn wake_output(&mut self) {
399 self.output_queue.lock().wake();
400 }
401
402 fn wake_input(&mut self) {
403 if let Some(w) = self.input_waker.take() {
404 w.wake();
405 }
406 }
407
408 fn setup_input_cursor(&mut self) {
411 if self.input_cursor.is_some() {
412 return;
414 }
415 let next_idx = match self.client_owned.iter().next() {
416 None => return,
417 Some(idx) => idx.clone(),
418 };
419 let _ = self.client_owned.remove(&next_idx);
420 self.input_cursor = Some((next_idx, 0));
421 self.wake_input();
422 }
423
424 fn read_output_packet(&mut self, packet: Packet) -> Result<Vec<u8>, Error> {
427 let packet = ValidPacket::try_from(packet)?;
428
429 let output_size = packet.valid_length_bytes as usize;
430 let offset = packet.start_offset as u64;
431 let mut output = vec![0; output_size];
432 let buf_idx = packet.buffer_index;
433 let vmo = self.output_buffers().get_mut(buf_idx).expect("output vmo should exist");
434 vmo.read(&mut output, offset)?;
435 self.processor.recycle_output_packet(&packet.header.into())?;
436 Ok(output)
437 }
438}
439
440pub struct StreamProcessor {
445 inner: Arc<RwLock<StreamProcessorInner>>,
446}
447
448pub struct StreamProcessorOutputStream {
451 inner: Arc<RwLock<StreamProcessorInner>>,
452}
453
454impl StreamProcessor {
455 fn create(processor: StreamProcessorProxy, sysmem_client: AllocatorProxy) -> Self {
458 let events = processor.take_event_stream();
459 Self {
460 inner: Arc::new(RwLock::new(StreamProcessorInner {
461 processor,
462 sysmem_client,
463 events,
464 input_packet_size: 0,
465 client_owned: HashSet::new(),
466 input_cursor: None,
467 output_queue: Default::default(),
468 input_waker: None,
469 input_allocation: maybe_done(SysmemAllocation::pending()),
470 output_allocation: maybe_done(SysmemAllocation::pending()),
471 })),
472 }
473 }
474
475 pub fn create_encoder(
479 input_domain: DomainFormat,
480 encoder_settings: EncoderSettings,
481 ) -> Result<StreamProcessor, Error> {
482 let sysmem_client = fuchsia_component::client::connect_to_protocol::<AllocatorMarker>()
483 .context("Connecting to sysmem")?;
484
485 let format_details = FormatDetails {
486 domain: Some(input_domain),
487 encoder_settings: Some(encoder_settings),
488 format_details_version_ordinal: Some(1),
489 mime_type: Some("audio/pcm".to_string()),
490 oob_bytes: None,
491 pass_through_parameters: None,
492 timebase: None,
493 ..Default::default()
494 };
495
496 let encoder_params = CreateEncoderParams {
497 input_details: Some(format_details),
498 require_hw: Some(false),
499 ..Default::default()
500 };
501
502 let codec_svc = fuchsia_component::client::connect_to_protocol::<CodecFactoryMarker>()
503 .context("Failed to connect to Codec Factory")?;
504
505 let (processor, stream_processor_serverend) = fidl::endpoints::create_proxy();
506
507 codec_svc.create_encoder(&encoder_params, stream_processor_serverend)?;
508
509 Ok(StreamProcessor::create(processor, sysmem_client))
510 }
511
512 pub fn create_decoder(
516 mime_type: &str,
517 oob_bytes: Option<Vec<u8>>,
518 ) -> Result<StreamProcessor, Error> {
519 let sysmem_client = fuchsia_component::client::connect_to_protocol::<AllocatorMarker>()
520 .context("Connecting to sysmem")?;
521
522 let format_details = FormatDetails {
523 mime_type: Some(mime_type.to_string()),
524 oob_bytes: oob_bytes,
525 format_details_version_ordinal: Some(1),
526 encoder_settings: None,
527 domain: None,
528 pass_through_parameters: None,
529 timebase: None,
530 ..Default::default()
531 };
532
533 let decoder_params = CreateDecoderParams {
534 input_details: Some(format_details),
535 permit_lack_of_split_header_handling: Some(true),
536 ..Default::default()
537 };
538
539 let codec_svc = fuchsia_component::client::connect_to_protocol::<CodecFactoryMarker>()
540 .context("Failed to connect to Codec Factory")?;
541
542 let (processor, stream_processor_serverend) = fidl::endpoints::create_proxy();
543
544 codec_svc.create_decoder(&decoder_params, stream_processor_serverend)?;
545
546 Ok(StreamProcessor::create(processor, sysmem_client))
547 }
548
549 pub fn take_output_stream(&mut self) -> Result<StreamProcessorOutputStream, Error> {
553 {
554 let read = self.inner.read();
555 let mut lock = read.output_queue.lock();
556 if let Listener::None = lock.listener {
557 lock.listener = Listener::New;
558 } else {
559 return Err(format_err!("Output stream already taken"));
560 }
561 }
562 Ok(StreamProcessorOutputStream { inner: self.inner.clone() })
563 }
564
565 fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize, io::Error> {
567 let mut bytes_idx = 0;
568 while bytes.len() > bytes_idx {
569 {
570 let mut write = self.inner.write();
571 let (idx, size) = match write.input_cursor.take() {
572 None => return Ok(bytes_idx),
573 Some(x) => x,
574 };
575 let space_left = write.input_packet_size - size;
576 let left_to_write = bytes.len() - bytes_idx;
577 let buffer_vmo = write.input_buffers().get_mut(idx.0).expect("need buffer vmo");
578 if space_left as usize > left_to_write {
579 let write_buf = &bytes[bytes_idx..];
580 let write_len = write_buf.len();
581 buffer_vmo.write(write_buf, size).map_err(|s| s.into_io_error())?;
582 bytes_idx += write_len;
583 write.input_cursor = Some((idx, size + write_len as u64));
584 assert!(bytes.len() == bytes_idx);
585 return Ok(bytes_idx);
586 }
587 let end_idx = bytes_idx + space_left as usize;
588 let write_buf = &bytes[bytes_idx..end_idx];
589 let write_len = write_buf.len();
590 buffer_vmo.write(write_buf, size).map_err(|s| s.into_io_error())?;
591 bytes_idx += write_len;
592 assert_eq!(size + write_len as u64, write.input_packet_size);
594 write.input_cursor = Some((idx, write.input_packet_size));
595 }
596 self.send_packet()?;
597 }
598 Ok(bytes_idx)
599 }
600
601 pub fn send_packet(&mut self) -> Result<(), io::Error> {
605 let mut write = self.inner.write();
606 if write.input_cursor.is_none() {
607 return Ok(());
609 }
610 let (idx, size) = write.input_cursor.take().expect("input cursor is none");
611 if size == 0 {
612 write.input_cursor = Some((idx, size));
614 return Ok(());
615 }
616 let packet = Packet {
617 header: Some(PacketHeader {
618 buffer_lifetime_ordinal: Some(1),
619 packet_index: Some(idx.0),
620 ..Default::default()
621 }),
622 buffer_index: Some(idx.0),
623 stream_lifetime_ordinal: Some(1),
624 start_offset: Some(0),
625 valid_length_bytes: Some(size as u32),
626 start_access_unit: Some(true),
627 known_end_access_unit: Some(true),
628 ..Default::default()
629 };
630 write.processor.queue_input_packet(&packet).map_err(fidl_error_to_io_error)?;
631 write.setup_input_cursor();
633 Ok(())
634 }
635
636 fn poll_writable(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
640 let mut write = self.inner.write();
641 write.input_waker = None;
644 if write.input_cursor.is_some() {
645 return Poll::Ready(Ok(()));
646 }
647 write.input_waker = Some(cx.waker().clone());
648 if let Err(e) = write.poll_events() {
653 return Poll::Ready(Err(io::Error::other(e)));
654 }
655 Poll::Pending
656 }
657
658 pub fn close(&mut self) -> Result<(), io::Error> {
659 self.send_packet()?;
660
661 let mut write = self.inner.write();
662
663 write.processor.queue_input_end_of_stream(1).map_err(fidl_error_to_io_error)?;
664 write.input_cursor = None;
667 write.wake_input();
668 write.wake_output();
669 Ok(())
670 }
671}
672
673impl AsyncWrite for StreamProcessor {
674 fn poll_write(
675 mut self: Pin<&mut Self>,
676 cx: &mut Context<'_>,
677 buf: &[u8],
678 ) -> Poll<io::Result<usize>> {
679 ready!(self.poll_writable(cx))?;
680 match self.write_bytes(buf) {
681 Ok(written) => Poll::Ready(Ok(written)),
682 Err(e) => Poll::Ready(Err(e.into())),
683 }
684 }
685
686 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
687 Poll::Ready(self.send_packet())
688 }
689
690 fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
691 Poll::Ready(self.send_packet())
692 }
693}
694
695impl Stream for StreamProcessorOutputStream {
696 type Item = Result<Vec<u8>, Error>;
697
698 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
699 let mut write = self.inner.write();
700 let packet = {
702 let mut queue = write.output_queue.lock();
703 match queue.poll_next_unpin(cx) {
704 Poll::Ready(Some(packet)) => Some(Some(packet)),
705 Poll::Ready(None) => Some(None),
706 Poll::Pending => {
707 None
710 }
711 }
712 };
713 if let Err(e) = write.poll_events() {
716 return Poll::Ready(Some(Err(e.into())));
717 }
718 match packet {
719 Some(Some(packet)) => Poll::Ready(Some(write.read_output_packet(packet))),
720 Some(None) => Poll::Ready(None),
721 None => Poll::Pending,
722 }
723 }
724}
725
726impl FusedStream for StreamProcessorOutputStream {
727 fn is_terminated(&self) -> bool {
728 self.inner.read().output_queue.lock().ended
729 }
730}
731
732#[cfg(test)]
733mod tests {
734 use super::*;
735
736 use async_test_helpers::run_while;
737 use byteorder::{ByteOrder, NativeEndian};
738 use fixture::fixture;
739 use fuchsia_async as fasync;
740 use futures::FutureExt;
741 use futures::io::AsyncWriteExt;
742 use futures_test::task::new_count_waker;
743 use sha2::{Digest as _, Sha256};
744 use std::fs::File;
745 use std::io::{Read, Write};
746 use std::pin::pin;
747
748 use stream_processor_test::ExpectedDigest;
749
750 const PCM_SAMPLE_SIZE: usize = 2;
751
752 #[derive(Clone, Debug)]
753 pub struct PcmAudio {
754 pcm_format: PcmFormat,
755 buffer: Vec<u8>,
756 }
757
758 impl PcmAudio {
759 pub fn create_saw_wave(pcm_format: PcmFormat, frame_count: usize) -> Self {
760 const FREQUENCY: f32 = 20.0;
761 const AMPLITUDE: f32 = 0.2;
762
763 let pcm_frame_size = PCM_SAMPLE_SIZE * pcm_format.channel_map.len();
764 let samples_per_frame = pcm_format.channel_map.len();
765 let sample_count = frame_count * samples_per_frame;
766
767 let mut buffer = vec![0; frame_count * pcm_frame_size];
768
769 for i in 0..sample_count {
770 let frame = (i / samples_per_frame) as f32;
771 let value =
772 ((frame * FREQUENCY / (pcm_format.frames_per_second as f32)) % 1.0) * AMPLITUDE;
773 let sample = (value * i16::max_value() as f32) as i16;
774
775 let mut sample_bytes = [0; std::mem::size_of::<i16>()];
776 NativeEndian::write_i16(&mut sample_bytes, sample);
777
778 let offset = i * PCM_SAMPLE_SIZE;
779 buffer[offset] = sample_bytes[0];
780 buffer[offset + 1] = sample_bytes[1];
781 }
782
783 Self { pcm_format, buffer }
784 }
785
786 pub fn frame_size(&self) -> usize {
787 self.pcm_format.channel_map.len() * PCM_SAMPLE_SIZE
788 }
789 }
790
791 pub struct BytesValidator {
794 pub output_file: Option<&'static str>,
795 pub expected_digest: ExpectedDigest,
796 }
797
798 impl BytesValidator {
799 fn write_and_hash(&self, mut file: impl Write, bytes: &[u8]) -> Result<(), Error> {
800 let mut hasher = Sha256::default();
801
802 file.write_all(&bytes)?;
803 hasher.update(&bytes);
804
805 let digest: [u8; 32] = hasher.finalize().into();
806 if self.expected_digest.bytes != digest {
807 return Err(format_err!(
808 "Expected {}; got {}",
809 self.expected_digest,
810 hex::encode(digest)
811 ))
812 .into();
813 }
814
815 Ok(())
816 }
817
818 fn output_file(&self) -> Result<impl Write, Error> {
819 Ok(if let Some(file) = self.output_file {
820 Box::new(std::fs::File::create(file)?) as Box<dyn Write>
821 } else {
822 Box::new(std::io::sink()) as Box<dyn Write>
823 })
824 }
825
826 fn validate(&self, bytes: &[u8]) -> Result<(), Error> {
827 self.write_and_hash(self.output_file()?, &bytes)
828 }
829 }
830
831 #[fuchsia::test]
832 fn encode_sbc() {
833 let mut exec = fasync::TestExecutor::new();
834
835 let pcm_format = PcmFormat {
836 pcm_mode: AudioPcmMode::Linear,
837 bits_per_sample: 16,
838 frames_per_second: 44100,
839 channel_map: vec![AudioChannelId::Cf],
840 };
841
842 let sub_bands = SbcSubBands::SubBands4;
843 let block_count = SbcBlockCount::BlockCount8;
844
845 let input_frames = 3000;
846
847 let pcm_audio = PcmAudio::create_saw_wave(pcm_format.clone(), input_frames);
848
849 let sbc_encoder_settings = EncoderSettings::Sbc(SbcEncoderSettings {
850 sub_bands,
851 block_count,
852 allocation: SbcAllocation::AllocLoudness,
853 channel_mode: SbcChannelMode::Mono,
854 bit_pool: 59, });
856
857 let input_domain = DomainFormat::Audio(AudioFormat::Uncompressed(
858 AudioUncompressedFormat::Pcm(pcm_format),
859 ));
860
861 let mut encoder = StreamProcessor::create_encoder(input_domain, sbc_encoder_settings)
862 .expect("to create Encoder");
863
864 let frames_per_packet: usize = 8; let packet_size = pcm_audio.frame_size() * frames_per_packet;
866 let mut packets = pcm_audio.buffer.as_slice().chunks(packet_size);
867 let first_packet = packets.next().unwrap();
868
869 let written =
872 exec.run_singlethreaded(&mut encoder.write(first_packet)).expect("successful write");
873 assert_eq!(written, first_packet.len());
874
875 let mut encoded_stream = encoder.take_output_stream().expect("Stream should be taken");
876
877 assert!(encoder.take_output_stream().is_err());
879
880 let encoded_fut = pin!(encoded_stream.next());
883
884 let (waker, encoder_fut_wake_count) = new_count_waker();
885 let mut counting_ctx = Context::from_waker(&waker);
886
887 assert!(encoded_fut.poll(&mut counting_ctx).is_pending());
888
889 let mut frames_sent = first_packet.len() / pcm_audio.frame_size();
890
891 for packet in packets {
892 let mut written_fut = encoder.write(&packet);
893
894 let written_bytes =
895 exec.run_singlethreaded(&mut written_fut).expect("to write to encoder");
896
897 assert_eq!(packet.len(), written_bytes);
898 frames_sent += packet.len() / pcm_audio.frame_size();
899 }
900
901 encoder.close().expect("stream should always be closable");
902
903 assert_eq!(input_frames, frames_sent);
904
905 let woke_count = encoder_fut_wake_count.get();
908 while encoder_fut_wake_count.get() == woke_count {
909 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
910 }
911 assert_eq!(encoder_fut_wake_count.get(), woke_count + 1);
912
913 let mut encoded = Vec::new();
915
916 loop {
917 let mut encoded_fut = encoded_stream.next();
918
919 match exec.run_singlethreaded(&mut encoded_fut) {
920 Some(Ok(enc_data)) => {
921 assert!(!enc_data.is_empty());
922 encoded.extend_from_slice(&enc_data);
923 }
924 Some(Err(e)) => {
925 panic!("Unexpected error when polling encoded data: {}", e);
926 }
927 None => {
928 break;
929 }
930 }
931 }
932
933 let expected_digest = ExpectedDigest::new(
935 "Sbc: 44.1kHz/Loudness/Mono/bitpool 56/blocks 8/subbands 4",
936 "5c65a88bda3f132538966d87df34aa8675f85c9892b7f9f5571f76f3c7813562",
937 );
938 let hash_validator = BytesValidator { output_file: None, expected_digest };
939
940 assert_eq!(6110, encoded.len(), "Encoded size should be equal");
941
942 let validated = hash_validator.validate(encoded.as_slice());
943 assert!(validated.is_ok(), "Failed hash: {:?}", validated);
944 }
945
946 fn fix_sbc_test_file<F>(_name: &str, test: F)
947 where
948 F: FnOnce(Vec<u8>) -> (),
949 {
950 const SBC_TEST_FILE: &str = "/pkg/data/s16le44100mono.sbc";
951
952 let mut sbc_data = Vec::new();
953 let _ = File::open(SBC_TEST_FILE)
954 .expect("open test file")
955 .read_to_end(&mut sbc_data)
956 .expect("read test file");
957
958 test(sbc_data)
959 }
960
961 #[fixture(fix_sbc_test_file)]
962 #[fuchsia::test]
963 fn decode_sbc(sbc_data: Vec<u8>) {
964 let mut exec = fasync::TestExecutor::new();
965
966 const SBC_FRAME_SIZE: usize = 72;
967 const INPUT_FRAMES: usize = 23;
968
969 let oob_data = Some([0x82, 0x00, 0x00, 0x00].to_vec());
971 let mut decoder =
972 StreamProcessor::create_decoder("audio/sbc", oob_data).expect("to create decoder");
973
974 let mut decoded_stream = decoder.take_output_stream().expect("Stream should be taken");
975
976 assert!(decoder.take_output_stream().is_err());
978
979 let mut frames_sent = 0;
980
981 let frames_per_packet: usize = 1; let packet_size = SBC_FRAME_SIZE * frames_per_packet;
983
984 for frames in sbc_data.as_slice().chunks(packet_size) {
985 let mut written_fut = decoder.write(&frames);
986
987 let written_bytes =
988 exec.run_singlethreaded(&mut written_fut).expect("to write to decoder");
989
990 assert_eq!(frames.len(), written_bytes);
991 frames_sent += frames.len() / SBC_FRAME_SIZE;
992 }
993
994 assert_eq!(INPUT_FRAMES, frames_sent);
995
996 let mut flush_fut = pin!(decoder.flush());
997 exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
998
999 decoder.close().expect("stream should always be closable");
1000
1001 let mut decoded = Vec::new();
1003
1004 loop {
1005 let mut decoded_fut = decoded_stream.next();
1006
1007 match exec.run_singlethreaded(&mut decoded_fut) {
1008 Some(Ok(dec_data)) => {
1009 assert!(!dec_data.is_empty());
1010 decoded.extend_from_slice(&dec_data);
1011 }
1012 Some(Err(e)) => {
1013 panic!("Unexpected error when polling decoded data: {}", e);
1014 }
1015 None => {
1016 break;
1017 }
1018 }
1019 }
1020
1021 let expected_digest = ExpectedDigest::new(
1023 "Pcm: 44.1kHz/16bit/Mono",
1024 "ff2e7afea51217886d3df15b9a623b4e49c9bd9bd79c58ac01bc94c5511e08d6",
1025 );
1026 let hash_validator = BytesValidator { output_file: None, expected_digest };
1027
1028 assert_eq!(256 * INPUT_FRAMES, decoded.len(), "Decoded size should be equal");
1029
1030 let validated = hash_validator.validate(decoded.as_slice());
1031 assert!(validated.is_ok(), "Failed hash: {:?}", validated);
1032 }
1033
1034 #[fixture(fix_sbc_test_file)]
1035 #[fuchsia::test]
1036 fn decode_sbc_wakes_output_to_process_events(sbc_data: Vec<u8>) {
1037 let mut exec = fasync::TestExecutor::new();
1038 const SBC_FRAME_SIZE: usize = 72;
1039
1040 let oob_data = Some([0x82, 0x00, 0x00, 0x00].to_vec());
1042 let mut decoder =
1043 StreamProcessor::create_decoder("audio/sbc", oob_data).expect("to create decoder");
1044
1045 let mut chunks = sbc_data.as_slice().chunks(SBC_FRAME_SIZE);
1046 let next_frame = chunks.next().unwrap();
1047
1048 let written =
1051 exec.run_singlethreaded(&mut decoder.write(next_frame)).expect("successful write");
1052 assert_eq!(written, next_frame.len());
1053
1054 let mut decoded_stream = decoder.take_output_stream().expect("Stream should be taken");
1055
1056 let decoded_fut = pin!(decoded_stream.next());
1059
1060 let (waker, decoder_fut_wake_count) = new_count_waker();
1061 let mut counting_ctx = Context::from_waker(&waker);
1062
1063 assert!(decoded_fut.poll(&mut counting_ctx).is_pending());
1064
1065 let frame = chunks.next().unwrap();
1068 let mut written_fut = decoder.write(&frame);
1069 let written_bytes = exec.run_singlethreaded(&mut written_fut).expect("to write to decoder");
1070 assert_eq!(frame.len(), written_bytes);
1071
1072 let mut flush_fut = pin!(decoder.flush());
1073 exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
1074
1075 assert_eq!(decoder_fut_wake_count.get(), 0);
1078 while decoder_fut_wake_count.get() == 0 {
1079 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
1080 }
1081 assert_eq!(decoder_fut_wake_count.get(), 1);
1082
1083 let mut decoded = Vec::new();
1084 let mut decoded_fut = decoded_stream.next();
1086
1087 match exec.run_singlethreaded(&mut decoded_fut) {
1088 Some(Ok(dec_data)) => {
1089 assert!(!dec_data.is_empty());
1090 decoded.extend_from_slice(&dec_data);
1091 }
1092 x => panic!("Expected decoded frame, got {:?}", x),
1093 }
1094
1095 assert_eq!(512, decoded.len(), "Decoded size should be equal to one frame");
1096 }
1097
1098 #[fixture(fix_sbc_test_file)]
1099 #[fuchsia::test]
1100 fn decode_sbc_wakes_input_to_process_events(sbc_data: Vec<u8>) {
1101 let mut exec = fasync::TestExecutor::new();
1102 const SBC_FRAME_SIZE: usize = 72;
1103
1104 let oob_data = Some([0x82, 0x00, 0x00, 0x00].to_vec());
1106 let mut decoder =
1107 StreamProcessor::create_decoder("audio/sbc", oob_data).expect("to create decoder");
1108
1109 let mut decoded_stream = decoder.take_output_stream().expect("Stream should be taken");
1110
1111 let decoded_fut = pin!(decoded_stream.next());
1112
1113 let mut chunks = sbc_data.as_slice().chunks(SBC_FRAME_SIZE).cycle();
1114 let next_frame = chunks.next().unwrap();
1115
1116 let (written_res, mut decoded_fut) =
1122 run_while(&mut exec, decoded_fut, decoder.write(next_frame));
1123 assert_eq!(written_res.expect("initial write should succeed"), next_frame.len());
1124
1125 let (waker, write_fut_wake_count) = new_count_waker();
1129 let mut counting_ctx = Context::from_waker(&waker);
1130
1131 let mut wake_count_before_stall = 0;
1132 for frame in chunks {
1133 wake_count_before_stall = write_fut_wake_count.get();
1134 let mut written_fut = decoder.write(&frame);
1135 if written_fut.poll_unpin(&mut counting_ctx).is_pending() {
1136 if write_fut_wake_count.get() != wake_count_before_stall {
1139 continue;
1140 }
1141 break;
1144 }
1145 let mut flush_fut = pin!(decoder.flush());
1147 exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
1148 }
1149
1150 let decoded_frame = exec.run_singlethreaded(&mut decoded_fut);
1152 assert_eq!(512, decoded_frame.unwrap().unwrap().len(), "Decoded frame size wrong");
1153
1154 let chunks = sbc_data.as_slice().chunks(SBC_FRAME_SIZE).cycle();
1156 for frame in chunks {
1157 wake_count_before_stall = write_fut_wake_count.get();
1158 let mut written_fut = decoder.write(&frame);
1159 if written_fut.poll_unpin(&mut counting_ctx).is_pending() {
1160 if write_fut_wake_count.get() != wake_count_before_stall {
1163 continue;
1164 }
1165 break;
1166 }
1167 let mut flush_fut = pin!(decoder.flush());
1169 exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
1170 }
1171
1172 while write_fut_wake_count.get() == wake_count_before_stall {
1177 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
1178 }
1179
1180 }
1183}